[C#] FASTER Generic IDevice log/commit manager+ AzureStorageDevice improvements (#288)

* Creates three generic pluggable interfaces:
  * Naming checkpoints (`ICheckpointNamingScheme`)
  * Factory for devices used in checkponts (`INamedDeviceFactory`)
   * Pluggable blob management (`IBlobManager`)

* Implements a single general combined log commit & checkpoint manager for any `IDevice` (provided using the factory mentioned above): `DeviceLogCommitCheckpointManager`
* Adds improved `AzureStorageDevice` with user-controlled support for leases via `IBlobManager` interface
* Added checksums for index and log checkpoints (instead of separate `completed.dat` file)
This commit is contained in:
Badrish Chandramouli 2020-08-14 18:16:59 -07:00 коммит произвёл GitHub
Родитель f7d7550d70
Коммит 8e094cd52c
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
36 изменённых файлов: 2042 добавлений и 232 удалений

Просмотреть файл

@ -527,7 +527,7 @@ namespace FASTER.core
// Segment size
LogSegmentSizeBits = settings.SegmentSizeBits;
SegmentSize = 1 << LogSegmentSizeBits;
SegmentSize = 1L << LogSegmentSizeBits;
SegmentBufferSize = 1 + (LogTotalSizeBytes / SegmentSize < 1 ? 1 : (int)(LogTotalSizeBytes / SegmentSize));
if (SegmentSize < PageSize)

Просмотреть файл

@ -14,8 +14,6 @@ namespace FASTER.core
/// This value is supplied for capacity when the device does not have a specified limit.
/// </summary>
public const long CAPACITY_UNSPECIFIED = -1;
private const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;";
private const string TEST_CONTAINER = "test";
/// <summary>
/// Create a storage device for the log

Просмотреть файл

@ -60,6 +60,9 @@ namespace FASTER.core
: base(filename, GetSectorSize(filename), capacity)
{
Native32.EnableProcessPrivileges();
string path = new FileInfo(filename).Directory.FullName;
if (!Directory.Exists(path))
Directory.CreateDirectory(path);
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
this.disableFileBuffering = disableFileBuffering;
@ -79,9 +82,9 @@ namespace FASTER.core
string bareName = fi.Name;
List<int> segids = new List<int>();
foreach (FileInfo item in di.GetFiles(bareName + "*"))
foreach (System.IO.FileInfo item in di.GetFiles(bareName + "*"))
{
segids.Add(Int32.Parse(item.Name.Replace(bareName, "").Replace(".", "")));
segids.Add(int.Parse(item.Name.Replace(bareName, "").Replace(".", "")));
}
segids.Sort();

Просмотреть файл

@ -0,0 +1,57 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.IO;
using System.Linq;
namespace FASTER.core
{
/// <summary>
/// Default checkpoint naming scheme used by FASTER
/// </summary>
public class DefaultCheckpointNamingScheme : ICheckpointNamingScheme
{
readonly string baseName;
/// <summary>
/// Create instance of default naming scheme
/// </summary>
/// <param name="baseName">Overall location specifier (e.g., local path or cloud container name)</param>
public DefaultCheckpointNamingScheme(string baseName = "")
{
this.baseName = baseName;
}
/// <inheritdoc />
public string BaseName() => baseName;
/// <inheritdoc />
public FileDescriptor LogCheckpointMetadata(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "info.dat");
/// <inheritdoc />
public FileDescriptor LogSnapshot(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "snapshot.dat");
/// <inheritdoc />
public FileDescriptor ObjectLogSnapshot(Guid token) => new FileDescriptor($"{LogCheckpointBasePath()}/{token}", "snapshot.obj.dat");
/// <inheritdoc />
public FileDescriptor IndexCheckpointMetadata(Guid token) => new FileDescriptor($"{IndexCheckpointBasePath()}/{token}", "info.dat");
/// <inheritdoc />
public FileDescriptor HashTable(Guid token) => new FileDescriptor($"{IndexCheckpointBasePath()}/{token}", "ht.dat");
/// <inheritdoc />
public FileDescriptor FasterLogCommitMetadata(long commitNumber) => new FileDescriptor($"{FasterLogCommitBasePath()}", $"commit.{commitNumber}");
/// <inheritdoc />
public Guid Token(FileDescriptor fileDescriptor) => Guid.Parse(new DirectoryInfo(fileDescriptor.directoryName).Name);
/// <inheritdoc />
public long CommitNumber(FileDescriptor fileDescriptor) => long.Parse(fileDescriptor.fileName.Split('.').Reverse().Take(2).Last());
/// <inheritdoc />
public string IndexCheckpointBasePath() => "index-checkpoints";
/// <inheritdoc />
public string LogCheckpointBasePath() => "cpr-checkpoints";
/// <inheritdoc />
public string FasterLogCommitBasePath() => "log-commits";
}
}

Просмотреть файл

@ -0,0 +1,351 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
namespace FASTER.core
{
/// <summary>
/// Log commit manager for a generic IDevice
/// </summary>
public class DeviceLogCommitCheckpointManager : ILogCommitManager, ICheckpointManager
{
private readonly INamedDeviceFactory deviceFactory;
private readonly ICheckpointNamingScheme checkpointNamingScheme;
private readonly SemaphoreSlim semaphore;
private readonly bool overwriteLogCommits;
private readonly bool removeOutdated;
private SectorAlignedBufferPool bufferPool;
private IDevice singleLogCommitDevice;
/// <summary>
/// Next commit number
/// </summary>
private long commitNum;
/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="checkpointNamingScheme">Checkpoint naming helper</param>
/// <param name="overwriteLogCommits">Overwrite same FASTER log commits each time</param>
/// <param name="removeOutdated">Remote older FASTER log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, ICheckpointNamingScheme checkpointNamingScheme, bool overwriteLogCommits = true, bool removeOutdated = false)
{
this.deviceFactory = deviceFactory;
this.checkpointNamingScheme = checkpointNamingScheme;
this.commitNum = 0;
this.semaphore = new SemaphoreSlim(0);
this.overwriteLogCommits = overwriteLogCommits;
this.removeOutdated = removeOutdated;
deviceFactory.Initialize(checkpointNamingScheme.BaseName());
}
/// <inheritdoc />
public void PurgeAll()
{
deviceFactory.Delete(new FileDescriptor { directoryName = "" });
}
/// <summary>
/// Create new instance of log commit manager
/// </summary>
/// <param name="deviceFactory">Factory for getting devices</param>
/// <param name="baseName">Overall location specifier (e.g., local path or cloud container name)</param>
/// <param name="overwriteLogCommits">Overwrite same FASTER log commits each time</param>
/// <param name="removeOutdated">Remote older FASTER log commits</param>
public DeviceLogCommitCheckpointManager(INamedDeviceFactory deviceFactory, string baseName, bool overwriteLogCommits = true, bool removeOutdated = false)
: this(deviceFactory, new DefaultCheckpointNamingScheme(baseName), overwriteLogCommits, removeOutdated)
{ }
#region ILogCommitManager
/// <inheritdoc />
public unsafe void Commit(long beginAddress, long untilAddress, byte[] commitMetadata)
{
var device = NextCommitDevice();
// Two phase to ensure we write metadata in single Write operation
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
if (!overwriteLogCommits)
device.Close();
RemoveOutdated();
}
/// <inheritdoc />
public void Dispose()
{
singleLogCommitDevice?.Close();
singleLogCommitDevice = null;
}
/// <inheritdoc />
public IEnumerable<long> ListCommits()
{
return deviceFactory.ListContents(checkpointNamingScheme.FasterLogCommitBasePath()).Select(e => checkpointNamingScheme.CommitNumber(e)).OrderByDescending(e => e);
}
/// <inheritdoc />
public byte[] GetCommitMetadata(long commitNum)
{
IDevice device;
if (overwriteLogCommits)
{
if (singleLogCommitDevice == null)
singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
device = singleLogCommitDevice;
}
else
{
device = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
this.commitNum = commitNum + 1;
}
ReadInto(device, 0, out byte[] writePad, sizeof(int));
int size = BitConverter.ToInt32(writePad, 0);
byte[] body;
if (writePad.Length >= size + sizeof(int))
body = writePad;
else
ReadInto(device, 0, out body, size + sizeof(int));
if (!overwriteLogCommits)
device.Close();
return new Span<byte>(body).Slice(sizeof(int)).ToArray();
}
private IDevice NextCommitDevice()
{
if (overwriteLogCommits)
{
if (singleLogCommitDevice == null)
singleLogCommitDevice = deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum));
return singleLogCommitDevice;
}
return deviceFactory.Get(checkpointNamingScheme.FasterLogCommitMetadata(commitNum++));
}
private void RemoveOutdated()
{
if (removeOutdated && commitNum > 1)
deviceFactory.Delete(checkpointNamingScheme.FasterLogCommitMetadata(commitNum - 2));
}
#endregion
#region ICheckpointManager
/// <inheritdoc />
public unsafe void CommitIndexCheckpoint(Guid indexToken, byte[] commitMetadata)
{
var device = NextIndexCheckpointDevice(indexToken);
// Two phase to ensure we write metadata in single Write operation
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
device.Close();
}
/// <inheritdoc />
public IEnumerable<Guid> GetIndexCheckpointTokens()
{
return deviceFactory.ListContents(checkpointNamingScheme.IndexCheckpointBasePath()).Select(e => checkpointNamingScheme.Token(e));
}
/// <inheritdoc />
public byte[] GetIndexCheckpointMetadata(Guid indexToken)
{
var device = deviceFactory.Get(checkpointNamingScheme.IndexCheckpointMetadata(indexToken));
ReadInto(device, 0, out byte[] writePad, sizeof(int));
int size = BitConverter.ToInt32(writePad, 0);
byte[] body;
if (writePad.Length >= size + sizeof(int))
body = writePad;
else
ReadInto(device, 0, out body, size + sizeof(int));
device.Close();
return new Span<byte>(body).Slice(sizeof(int)).ToArray();
}
/// <inheritdoc />
public unsafe void CommitLogCheckpoint(Guid logToken, byte[] commitMetadata)
{
var device = NextLogCheckpointDevice(logToken);
// Two phase to ensure we write metadata in single Write operation
using var ms = new MemoryStream();
using var writer = new BinaryWriter(ms);
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
WriteInto(device, 0, ms.ToArray(), (int)ms.Position);
device.Close();
}
/// <inheritdoc />
public IEnumerable<Guid> GetLogCheckpointTokens()
{
return deviceFactory.ListContents(checkpointNamingScheme.LogCheckpointBasePath()).Select(e => checkpointNamingScheme.Token(e));
}
/// <inheritdoc />
public byte[] GetLogCheckpointMetadata(Guid logToken)
{
var device = deviceFactory.Get(checkpointNamingScheme.LogCheckpointMetadata(logToken));
ReadInto(device, 0, out byte[] writePad, sizeof(int));
int size = BitConverter.ToInt32(writePad, 0);
byte[] body;
if (writePad.Length >= size + sizeof(int))
body = writePad;
else
ReadInto(device, 0, out body, size + sizeof(int));
device.Close();
return new Span<byte>(body).Slice(sizeof(int)).ToArray();
}
/// <inheritdoc />
public IDevice GetIndexDevice(Guid indexToken)
{
return deviceFactory.Get(checkpointNamingScheme.HashTable(indexToken));
}
/// <inheritdoc />
public IDevice GetSnapshotLogDevice(Guid token)
{
return deviceFactory.Get(checkpointNamingScheme.LogSnapshot(token));
}
/// <inheritdoc />
public IDevice GetSnapshotObjectLogDevice(Guid token)
{
return deviceFactory.Get(checkpointNamingScheme.ObjectLogSnapshot(token));
}
/// <inheritdoc />
public void InitializeIndexCheckpoint(Guid indexToken)
{
}
/// <inheritdoc />
public void InitializeLogCheckpoint(Guid logToken)
{
}
private IDevice NextIndexCheckpointDevice(Guid token)
{
if (!removeOutdated)
{
return deviceFactory.Get(checkpointNamingScheme.IndexCheckpointMetadata(token));
}
throw new NotImplementedException();
}
private IDevice NextLogCheckpointDevice(Guid token)
{
if (!removeOutdated)
{
return deviceFactory.Get(checkpointNamingScheme.LogCheckpointMetadata(token));
}
throw new NotImplementedException();
}
#endregion
private unsafe void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* overlapped)
{
try
{
if (errorCode != 0)
{
Trace.TraceError("OverlappedStream GetQueuedCompletionStatus error: {0}", errorCode);
}
semaphore.Release();
}
finally
{
Overlapped.Free(overlapped);
}
}
/// <summary>
/// Note: will read potentially more data (based on sector alignment)
/// </summary>
/// <param name="device"></param>
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void ReadInto(IDevice device, ulong address, out byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
long numBytesToRead = size;
numBytesToRead = ((numBytesToRead + (device.SectorSize - 1)) & ~(device.SectorSize - 1));
var pbuffer = bufferPool.Get((int)numBytesToRead);
device.ReadAsync(address, (IntPtr)pbuffer.aligned_pointer,
(uint)numBytesToRead, IOCallback, null);
semaphore.Wait();
buffer = new byte[numBytesToRead];
fixed (byte* bufferRaw = buffer)
Buffer.MemoryCopy(pbuffer.aligned_pointer, bufferRaw, numBytesToRead, numBytesToRead);
pbuffer.Return();
}
/// <summary>
/// Note: pads the bytes with zeros to achieve sector alignment
/// </summary>
/// <param name="device"></param>
/// <param name="address"></param>
/// <param name="buffer"></param>
/// <param name="size"></param>
private unsafe void WriteInto(IDevice device, ulong address, byte[] buffer, int size)
{
if (bufferPool == null)
bufferPool = new SectorAlignedBufferPool(1, (int)device.SectorSize);
long numBytesToWrite = size;
numBytesToWrite = ((numBytesToWrite + (device.SectorSize - 1)) & ~(device.SectorSize - 1));
var pbuffer = bufferPool.Get((int)numBytesToWrite);
fixed (byte* bufferRaw = buffer)
{
Buffer.MemoryCopy(bufferRaw, pbuffer.aligned_pointer, size, size);
}
device.WriteAsync((IntPtr)pbuffer.aligned_pointer, address, (uint)numBytesToWrite, IOCallback, null);
semaphore.Wait();
pbuffer.Return();
}
}
}

Просмотреть файл

@ -0,0 +1,98 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.CodeDom;
using System.Collections.Generic;
using System.IO;
using System.Linq;
namespace FASTER.core
{
/// <summary>
/// Interface to provide paths and names for all checkpoint-related files
/// </summary>
public interface ICheckpointNamingScheme
{
/// <summary>
/// Base (or container) name for all checkpoint files
/// </summary>
/// <returns></returns>
public string BaseName();
/// <summary>
/// Hash table (including overflow buckets)
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
FileDescriptor HashTable(Guid token);
/// <summary>
/// Index checkpoint metadata
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
FileDescriptor IndexCheckpointMetadata(Guid token);
/// <summary>
/// Hybrid log checkpoint metadata
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
FileDescriptor LogCheckpointMetadata(Guid token);
/// <summary>
/// Hybrid log snapshot
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
FileDescriptor LogSnapshot(Guid token);
/// <summary>
/// Object log snapshot
/// </summary>
/// <param name="token"></param>
/// <returns></returns>
FileDescriptor ObjectLogSnapshot(Guid token);
/// <summary>
/// FasterLog commit metadata
/// </summary>
/// <returns></returns>
FileDescriptor FasterLogCommitMetadata(long commitNumber);
/// <summary>
/// Token associated with given file descriptor
/// </summary>
/// <param name="fileDescriptor"></param>
/// <returns></returns>
Guid Token(FileDescriptor fileDescriptor);
/// <summary>
/// Commit number associated with given file descriptor
/// </summary>
/// <param name="fileDescriptor"></param>
/// <returns></returns>
long CommitNumber(FileDescriptor fileDescriptor);
/// <summary>
/// Get base path holding index checkpoints
/// </summary>
/// <returns></returns>
string IndexCheckpointBasePath();
/// <summary>
/// Get base path holding log checkpoints
/// </summary>
/// <returns></returns>
string LogCheckpointBasePath();
/// <summary>
/// Get base path holding FasterLog commits
/// </summary>
/// <returns></returns>
string FasterLogCommitBasePath();
}
}

Просмотреть файл

@ -0,0 +1,40 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
namespace FASTER.core
{
/// <summary>
/// Factory for getting IDevice instances for checkpointing
/// </summary>
public interface INamedDeviceFactory
{
/// <summary>
/// Initialize base name or container
/// </summary>
/// <param name="baseName">Base name or container</param>
void Initialize(string baseName);
/// <summary>
/// Get IDevice instance for given file info
/// </summary>
/// <param name="fileInfo">File info</param>
/// <returns></returns>
IDevice Get(FileDescriptor fileInfo);
/// <summary>
/// Delete IDevice for given file info
/// </summary>
/// <param name="fileInfo">File info</param>
/// <returns></returns>
void Delete(FileDescriptor fileInfo);
/// <summary>
/// List path contents, in order of preference
/// </summary>
/// <returns></returns>
IEnumerable<FileDescriptor> ListContents(string path);
}
}

Просмотреть файл

@ -0,0 +1,80 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security;
namespace FASTER.core
{
/// <summary>
/// Local storage device factory
/// </summary>
public class LocalStorageNamedDeviceFactory : INamedDeviceFactory
{
private string baseName;
private readonly bool deleteOnClose;
private readonly bool preallocateFile;
/// <summary>
/// Create instance of factory
/// </summary>
/// <param name="preallocateFile">Whether files should be preallocated</param>
/// <param name="deleteOnClose">Whether file should be deleted on close</param>
public LocalStorageNamedDeviceFactory(bool preallocateFile = false, bool deleteOnClose = false)
{
this.preallocateFile = preallocateFile;
this.deleteOnClose = deleteOnClose;
}
/// <inheritdoc />
public void Initialize(string baseName)
{
this.baseName = baseName;
}
/// <inheritdoc />
public IDevice Get(FileDescriptor fileInfo)
{
return Devices.CreateLogDevice(Path.Combine(baseName, fileInfo.directoryName, fileInfo.fileName), preallocateFile: preallocateFile, deleteOnClose: deleteOnClose);
}
/// <inheritdoc />
public IEnumerable<FileDescriptor> ListContents(string path)
{
var pathInfo = new DirectoryInfo(Path.Combine(baseName, path));
if (pathInfo.Exists)
{
foreach (var folder in pathInfo.GetDirectories().OrderByDescending(f => f.LastWriteTime))
{
yield return new FileDescriptor(folder.Name, "");
}
foreach (var file in pathInfo.GetFiles().OrderByDescending(f => f.LastWriteTime))
{
yield return new FileDescriptor("", file.Name);
}
}
}
/// <inheritdoc />
public void Delete(FileDescriptor fileInfo)
{
if (fileInfo.fileName != null)
{
var file = new FileInfo(Path.Combine(baseName, fileInfo.directoryName, fileInfo.fileName + ".0"));
if (file.Exists)
file.Delete();
}
else
{
var dir = new DirectoryInfo(Path.Combine(baseName, fileInfo.directoryName));
dir.Delete(true);
}
}
}
}

Просмотреть файл

@ -37,8 +37,7 @@ namespace FASTER.core
/// <summary>
/// Use specified directory for storing and retrieving checkpoints
/// This is a shortcut to providing the following:
/// CheckpointSettings.CheckpointManager = new LocalCheckpointManager(CheckpointDir)
/// using local storage device.
/// </summary>
public string CheckpointDir = null;
}

Просмотреть файл

@ -141,6 +141,8 @@ namespace FASTER.core
/// </summary>
public struct HybridLogRecoveryInfo
{
const int CheckpointVersion = 1;
/// <summary>
/// Guid
/// </summary>
@ -219,6 +221,12 @@ namespace FASTER.core
continueTokens = new ConcurrentDictionary<string, CommitPoint>();
string value = reader.ReadLine();
var cversion = int.Parse(value);
value = reader.ReadLine();
var checksum = long.Parse(value);
value = reader.ReadLine();
guid = Guid.Parse(value);
value = reader.ReadLine();
@ -275,6 +283,12 @@ namespace FASTER.core
objectLogSegmentOffsets[i] = long.Parse(value);
}
}
if (cversion != CheckpointVersion)
throw new FasterException("Invalid version");
if (checksum != Checksum(continueTokens.Count))
throw new FasterException("Invalid checksum for checkpoint");
}
/// <summary>
@ -285,7 +299,7 @@ namespace FASTER.core
/// <returns></returns>
internal void Recover(Guid token, ICheckpointManager checkpointManager)
{
var metadata = checkpointManager.GetLogCommitMetadata(token);
var metadata = checkpointManager.GetLogCheckpointMetadata(token);
if (metadata == null)
throw new FasterException("Invalid log commit metadata for ID " + token.ToString());
@ -302,6 +316,9 @@ namespace FASTER.core
{
using (StreamWriter writer = new StreamWriter(ms))
{
writer.WriteLine(CheckpointVersion); // checkpoint version
writer.WriteLine(Checksum(checkpointTokens.Count)); // checksum
writer.WriteLine(guid);
writer.WriteLine(useSnapshotFile);
writer.WriteLine(version);
@ -335,6 +352,15 @@ namespace FASTER.core
}
}
private readonly long Checksum(int checkpointTokensCount)
{
var bytes = guid.ToByteArray();
var long1 = BitConverter.ToInt64(bytes, 0);
var long2 = BitConverter.ToInt64(bytes, 8);
return long1 ^ long2 ^ version ^ flushedLogicalAddress ^ startLogicalAddress ^ finalLogicalAddress ^ headAddress ^ beginAddress
^ checkpointTokensCount ^ (objectLogSegmentOffsets == null ? 0 : objectLogSegmentOffsets.Length);
}
/// <summary>
/// Print checkpoint info for debugging purposes
/// </summary>
@ -394,6 +420,7 @@ namespace FASTER.core
internal struct IndexRecoveryInfo
{
const int CheckpointVersion = 1;
public Guid token;
public long table_size;
public ulong num_ht_bytes;
@ -416,6 +443,12 @@ namespace FASTER.core
public void Initialize(StreamReader reader)
{
string value = reader.ReadLine();
var cversion = int.Parse(value);
value = reader.ReadLine();
var checksum = long.Parse(value);
value = reader.ReadLine();
token = Guid.Parse(value);
value = reader.ReadLine();
@ -435,11 +468,17 @@ namespace FASTER.core
value = reader.ReadLine();
finalLogicalAddress = long.Parse(value);
if (cversion != CheckpointVersion)
throw new FasterException("Invalid version");
if (checksum != Checksum())
throw new FasterException("Invalid checksum for checkpoint");
}
public void Recover(Guid guid, ICheckpointManager checkpointManager)
{
var metadata = checkpointManager.GetIndexCommitMetadata(guid);
var metadata = checkpointManager.GetIndexCheckpointMetadata(guid);
if (metadata == null)
throw new FasterException("Invalid index commit metadata for ID " + guid.ToString());
using (StreamReader s = new StreamReader(new MemoryStream(metadata)))
@ -452,6 +491,8 @@ namespace FASTER.core
{
using (var writer = new StreamWriter(ms))
{
writer.WriteLine(CheckpointVersion); // checkpoint version
writer.WriteLine(Checksum()); // checksum
writer.WriteLine(token);
writer.WriteLine(table_size);
@ -465,6 +506,15 @@ namespace FASTER.core
}
}
private readonly long Checksum()
{
var bytes = token.ToByteArray();
var long1 = BitConverter.ToInt64(bytes, 0);
var long2 = BitConverter.ToInt64(bytes, 8);
return long1 ^ long2 ^ table_size ^ (long)num_ht_bytes ^ (long)num_ofb_bytes
^ num_buckets ^ startLogicalAddress ^ finalLogicalAddress;
}
public readonly void DebugPrint()
{
Debug.WriteLine("******** Index Checkpoint Info for {0} ********", token);

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
@ -98,8 +99,24 @@ namespace FASTER.core
throw new FasterException(
"Specify either CheckpointManager or CheckpointDir for CheckpointSettings, not both");
checkpointManager = checkpointSettings.CheckpointManager ??
bool oldCheckpointManager = false;
if (oldCheckpointManager)
{
checkpointManager = checkpointSettings.CheckpointManager ??
new LocalCheckpointManager(checkpointSettings.CheckpointDir ?? "");
}
else
{
checkpointManager = checkpointSettings.CheckpointManager ??
new DeviceLogCommitCheckpointManager
(new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(checkpointSettings.CheckpointDir ?? ".").FullName));
}
if (checkpointSettings.CheckpointManager == null)
disposeCheckpointManager = true;
FoldOverSnapshot = checkpointSettings.CheckPointType == core.CheckpointType.FoldOver;
CopyReadsToTail = logSettings.CopyReadsToTail;
@ -440,6 +457,8 @@ namespace FASTER.core
Free();
hlog.Dispose();
readcache?.Dispose();
if (disposeCheckpointManager)
checkpointManager?.Dispose();
}
}
}

Просмотреть файл

@ -22,6 +22,7 @@ namespace FASTER.core
private readonly BlittableAllocator<Empty, byte> allocator;
private readonly LightEpoch epoch;
private readonly ILogCommitManager logCommitManager;
private readonly bool disposeLogCommitManager;
private readonly GetMemory getMemory;
private readonly int headerSize;
private readonly LogChecksumType logChecksum;
@ -81,9 +82,25 @@ namespace FASTER.core
/// <param name="logSettings"></param>
public FasterLog(FasterLogSettings logSettings)
{
logCommitManager = logSettings.LogCommitManager ??
new LocalLogCommitManager(logSettings.LogCommitFile ??
logSettings.LogDevice.FileName + ".commit");
bool oldCommitManager = false;
if (oldCommitManager)
{
logCommitManager = logSettings.LogCommitManager ??
new LocalLogCommitManager(logSettings.LogCommitFile ??
logSettings.LogDevice.FileName + ".commit");
}
else
{
logCommitManager = logSettings.LogCommitManager ??
new DeviceLogCommitCheckpointManager
(new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
new FileInfo(logSettings.LogDevice.FileName).Directory.FullName));
}
if (logSettings.LogCommitManager == null)
disposeLogCommitManager = true;
// Reserve 8 byte checksum in header if requested
logChecksum = logSettings.LogChecksum;
@ -114,6 +131,8 @@ namespace FASTER.core
commitTcs.TrySetException(new ObjectDisposedException("Log has been disposed"));
allocator.Dispose();
epoch.Dispose();
if (disposeLogCommitManager)
logCommitManager.Dispose();
}
#region Enqueue
@ -806,28 +825,40 @@ namespace FASTER.core
/// </summary>
private void Restore(out Dictionary<string, long> recoveredIterators)
{
recoveredIterators = null;
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
var commitInfo = logCommitManager.GetCommitMetadata();
if (commitInfo == null) return;
using (var r = new BinaryReader(new MemoryStream(commitInfo)))
foreach (var commitNum in logCommitManager.ListCommits())
{
info.Initialize(r);
try
{
var commitInfo = logCommitManager.GetCommitMetadata(commitNum);
FasterLogRecoveryInfo info = new FasterLogRecoveryInfo();
if (commitInfo == null) return;
using (var r = new BinaryReader(new MemoryStream(commitInfo)))
{
info.Initialize(r);
}
var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress);
if (info.BeginAddress > headAddress)
headAddress = info.BeginAddress;
if (headAddress == 0) headAddress = Constants.kFirstValidAddress;
recoveredIterators = info.Iterators;
allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress);
CommittedUntilAddress = info.FlushedUntilAddress;
CommittedBeginAddress = info.BeginAddress;
return;
}
catch { }
}
var headAddress = info.FlushedUntilAddress - allocator.GetOffsetInPage(info.FlushedUntilAddress);
if (info.BeginAddress > headAddress)
headAddress = info.BeginAddress;
if (headAddress == 0) headAddress = Constants.kFirstValidAddress;
recoveredIterators = info.Iterators;
allocator.RestoreHybridLog(info.FlushedUntilAddress, headAddress, info.BeginAddress);
CommittedUntilAddress = info.FlushedUntilAddress;
CommittedBeginAddress = info.BeginAddress;
Debug.WriteLine("Unable to recover using any available commit");
}
/// <summary>

Просмотреть файл

@ -82,20 +82,6 @@ namespace FASTER.core
}
}
/// <summary>
/// Recover info from token
/// </summary>
/// <param name="logCommitManager"></param>
/// <returns></returns>
internal void Recover(ILogCommitManager logCommitManager)
{
var metadata = logCommitManager.GetCommitMetadata();
if (metadata == null)
throw new FasterException("Invalid log commit metadata during recovery");
Initialize(new BinaryReader(new MemoryStream(metadata)));
}
/// <summary>
/// Reset
/// </summary>

Просмотреть файл

@ -1,12 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.Collections;
using System.Collections.Generic;
namespace FASTER.core
{
/// <summary>
/// Log commit manager
/// </summary>
public interface ILogCommitManager
public interface ILogCommitManager : IDisposable
{
/// <summary>
/// Perform (synchronous) commit with specified metadata
@ -17,9 +21,16 @@ namespace FASTER.core
void Commit(long beginAddress, long untilAddress, byte[] commitMetadata);
/// <summary>
/// Return prior commit metadata during recovery
/// Return commit metadata
/// </summary>
/// <param name="commitNum"></param>
/// <returns></returns>
byte[] GetCommitMetadata(long commitNum);
/// <summary>
/// Get list of commits, in order of usage preference
/// </summary>
/// <returns></returns>
byte[] GetCommitMetadata();
public IEnumerable<long> ListCommits();
}
}

Просмотреть файл

@ -1,12 +1,13 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System.Collections.Generic;
using System.IO;
namespace FASTER.core
{
/// <summary>
/// Implementation of checkpoint interface for local file storage
/// Older implementation of checkpoint interface for local file storage (left for backward compatibility)
/// </summary>
public sealed class LocalLogCommitManager : ILogCommitManager
{
@ -30,35 +31,48 @@ namespace FASTER.core
public void Commit(long beginAddress, long untilAddress, byte[] commitMetadata)
{
// Two phase to ensure we write metadata in single Write operation
using (var ms = new MemoryStream())
using var ms = new MemoryStream();
using (var writer = new BinaryWriter(ms))
{
using (var writer = new BinaryWriter(ms))
{
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
}
using (var writer = new BinaryWriter(new FileStream(commitFile, FileMode.OpenOrCreate)))
{
writer.Write(ms.ToArray());
writer.Flush();
}
writer.Write(commitMetadata.Length);
writer.Write(commitMetadata);
}
using (var writer = new BinaryWriter(new FileStream(commitFile, FileMode.OpenOrCreate)))
{
writer.Write(ms.ToArray());
writer.Flush();
}
}
/// <summary>
/// Dispose
/// </summary>
public void Dispose()
{
}
/// <summary>
/// Retrieve commit metadata
/// </summary>
/// <returns>Metadata, or null if invalid</returns>
public byte[] GetCommitMetadata()
public byte[] GetCommitMetadata(long commitNum)
{
if (!File.Exists(commitFile))
return null;
using (var reader = new BinaryReader(new FileStream(commitFile, FileMode.Open)))
{
var len = reader.ReadInt32();
return reader.ReadBytes(len);
}
using var reader = new BinaryReader(new FileStream(commitFile, FileMode.Open));
var len = reader.ReadInt32();
return reader.ReadBytes(len);
}
/// <summary>
/// List of commit numbers
/// </summary>
/// <returns></returns>
public IEnumerable<long> ListCommits()
{
// we only use a single commit file in this implementation
yield return 0;
}
}
}

Просмотреть файл

@ -8,7 +8,7 @@ namespace FASTER.core
{
class DirectoryConfiguration
{
private readonly string checkpointDir;
internal readonly string checkpointDir;
public DirectoryConfiguration(string checkpointDir)
{

Просмотреть файл

@ -0,0 +1,31 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
namespace FASTER.core
{
/// <summary>
/// Complete specification of file with path, for local and cloud files
/// </summary>
public struct FileDescriptor
{
/// <summary>
/// Relative directory name or path
/// </summary>
public string directoryName;
/// <summary>
/// Actual file or blob name
/// </summary>
public string fileName;
/// <summary>
/// Create FileInfo instance
/// </summary>
/// <param name="directoryName"></param>
/// <param name="fileName"></param>
public FileDescriptor(string directoryName, string fileName)
{
this.directoryName = directoryName;
this.fileName = fileName;
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
namespace FASTER.core
{
@ -19,7 +20,6 @@ namespace FASTER.core
/// CommitIndexCheckpoint (for index checkpoints) ->
///
/// Recovery:
/// GetLatestCheckpoint (if request to recover to latest checkpoint) ->
/// GetIndexCommitMetadata ->
/// GetLogCommitMetadata ->
/// GetIndexDevice ->
@ -28,7 +28,7 @@ namespace FASTER.core
///
/// Provided devices will be closed directly by FASTER when done.
/// </summary>
public interface ICheckpointManager
public interface ICheckpointManager : IDisposable
{
/// <summary>
/// Initialize index checkpoint
@ -63,14 +63,27 @@ namespace FASTER.core
/// </summary>
/// <param name="indexToken">Token</param>
/// <returns>Metadata, or null if invalid</returns>
byte[] GetIndexCommitMetadata(Guid indexToken);
byte[] GetIndexCheckpointMetadata(Guid indexToken);
/// <summary>
/// Retrieve commit metadata for specified log checkpoint
/// </summary>
/// <param name="logToken">Token</param>
/// <returns>Metadata, or null if invalid</returns>
byte[] GetLogCommitMetadata(Guid logToken);
byte[] GetLogCheckpointMetadata(Guid logToken);
/// <summary>
/// Get list of index checkpoint tokens, in order of usage preference
/// </summary>
/// <returns></returns>
public IEnumerable<Guid> GetIndexCheckpointTokens();
/// <summary>
/// Get list of log checkpoint tokens, in order of usage preference
/// </summary>
/// <returns></returns>
public IEnumerable<Guid> GetLogCheckpointTokens();
/// <summary>
/// Provide device to store index checkpoint (including overflow buckets)
@ -94,11 +107,8 @@ namespace FASTER.core
IDevice GetSnapshotObjectLogDevice(Guid token);
/// <summary>
/// Get latest valid checkpoint for recovery
/// Cleanup all data (subfolder) related to checkpoints by this manager
/// </summary>
/// <param name="indexToken"></param>
/// <param name="logToken"></param>
/// <returns>true if latest valid checkpoint found, false otherwise</returns>
bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken);
public void PurgeAll();
}
}

Просмотреть файл

@ -20,6 +20,7 @@ namespace FASTER.core
public unsafe partial class FasterBase
{
internal ICheckpointManager checkpointManager;
internal bool disposeCheckpointManager;
// Derived class exposed API
internal void RecoverFuzzyIndex(IndexCheckpointInfo info)

Просмотреть файл

@ -2,13 +2,14 @@
// Licensed under the MIT license.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
namespace FASTER.core
{
/// <summary>
/// Implementation of checkpoint interface for local file storage
/// Older implementation of checkpoint interface for local file storage (left for backward compatibility)
/// </summary>
public sealed class LocalCheckpointManager : ICheckpointManager
{
@ -23,6 +24,14 @@ namespace FASTER.core
directoryConfiguration = new DirectoryConfiguration(CheckpointDir);
}
/// <summary>
/// Cleanup all files in this folder
/// </summary>
public void PurgeAll()
{
new DirectoryInfo(directoryConfiguration.checkpointDir).Delete(true);
}
/// <summary>
/// Initialize index checkpoint
/// </summary>
@ -92,7 +101,7 @@ namespace FASTER.core
/// </summary>
/// <param name="indexToken">Token</param>
/// <returns>Metadata, or null if invalid</returns>
public byte[] GetIndexCommitMetadata(Guid indexToken)
public byte[] GetIndexCheckpointMetadata(Guid indexToken)
{
var dir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder(indexToken));
if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
@ -111,7 +120,7 @@ namespace FASTER.core
/// </summary>
/// <param name="logToken">Token</param>
/// <returns>Metadata, or null if invalid</returns>
public byte[] GetLogCommitMetadata(Guid logToken)
public byte[] GetLogCheckpointMetadata(Guid logToken)
{
var dir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder(logToken));
if (!File.Exists(dir.FullName + Path.DirectorySeparatorChar + "completed.dat"))
@ -155,13 +164,8 @@ namespace FASTER.core
return Devices.CreateLogDevice(directoryConfiguration.GetObjectLogSnapshotFileName(token), false);
}
/// <summary>
/// Get latest valid checkpoint for recovery
/// </summary>
/// <param name="indexToken"></param>
/// <param name="logToken"></param>
/// <returns></returns>
public bool GetLatestCheckpoint(out Guid indexToken, out Guid logToken)
/// <inheritdoc />
public IEnumerable<Guid> GetIndexCheckpointTokens()
{
var indexCheckpointDir = new DirectoryInfo(directoryConfiguration.GetIndexCheckpointFolder());
var dirs = indexCheckpointDir.GetDirectories();
@ -173,15 +177,26 @@ namespace FASTER.core
Directory.Delete(dir.FullName, true);
}
}
var latestICFolder = indexCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First();
if (latestICFolder == null || !Guid.TryParse(latestICFolder.Name, out indexToken))
bool found = false;
foreach (var folder in indexCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime))
{
throw new FasterException("No valid index checkpoint to recover from");
if (Guid.TryParse(folder.Name, out var indexToken))
{
found = true;
yield return indexToken;
}
}
if (!found)
throw new FasterException("No valid index checkpoint to recover from");
}
/// <inheritdoc />
public IEnumerable<Guid> GetLogCheckpointTokens()
{
var hlogCheckpointDir = new DirectoryInfo(directoryConfiguration.GetHybridLogCheckpointFolder());
dirs = hlogCheckpointDir.GetDirectories();
var dirs = hlogCheckpointDir.GetDirectories();
foreach (var dir in dirs)
{
// Remove incomplete checkpoints
@ -190,12 +205,24 @@ namespace FASTER.core
Directory.Delete(dir.FullName, true);
}
}
var latestHLCFolder = hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime).First();
if (latestHLCFolder == null || !Guid.TryParse(latestHLCFolder.Name, out logToken))
bool found = false;
foreach (var folder in hlogCheckpointDir.GetDirectories().OrderByDescending(f => f.LastWriteTime))
{
throw new FasterException("No valid hybrid log checkpoint to recover from");
if (Guid.TryParse(folder.Name, out var logToken))
{
found = true;
yield return logToken;
}
}
return true;
if (!found)
throw new FasterException("No valid hybrid log checkpoint to recover from");
}
/// <inheritdoc />
public void Dispose()
{
}
}
}

Просмотреть файл

@ -5,6 +5,7 @@
using System;
using System.Diagnostics;
using System.Resources;
using System.Threading;
namespace FASTER.core
@ -52,8 +53,63 @@ namespace FASTER.core
private void InternalRecoverFromLatestCheckpoints()
{
checkpointManager.GetLatestCheckpoint(out Guid indexCheckpointGuid, out Guid hybridLogCheckpointGuid);
InternalRecover(indexCheckpointGuid, hybridLogCheckpointGuid);
Debug.WriteLine("********* Primary Recovery Information ********");
HybridLogCheckpointInfo recoveredHLCInfo = default;
foreach (var hybridLogToken in checkpointManager.GetLogCheckpointTokens())
{
try
{
recoveredHLCInfo = new HybridLogCheckpointInfo();
recoveredHLCInfo.Recover(hybridLogToken, checkpointManager);
}
catch
{
continue;
}
Debug.WriteLine("HybridLog Checkpoint: {0}", hybridLogToken);
break;
}
if (recoveredHLCInfo.IsDefault())
throw new FasterException("Unable to find valid index token");
recoveredHLCInfo.info.DebugPrint();
IndexCheckpointInfo recoveredICInfo = default;
foreach (var indexToken in checkpointManager.GetIndexCheckpointTokens())
{
try
{
// Recovery appropriate context information
recoveredICInfo = new IndexCheckpointInfo();
recoveredICInfo.Recover(indexToken, checkpointManager);
}
catch
{
continue;
}
if (!IsCompatible(recoveredICInfo.info, recoveredHLCInfo.info))
{
recoveredICInfo = default;
continue;
}
Debug.WriteLine("Index Checkpoint: {0}", indexToken);
break;
}
if (recoveredICInfo.IsDefault())
throw new FasterException("Unable to find valid index token");
recoveredICInfo.info.DebugPrint();
InternalRecover(recoveredICInfo, recoveredHLCInfo);
}
private bool IsCompatible(in IndexRecoveryInfo indexInfo, in HybridLogRecoveryInfo recoveryInfo)
@ -69,8 +125,6 @@ namespace FASTER.core
Debug.WriteLine("Index Checkpoint: {0}", indexToken);
Debug.WriteLine("HybridLog Checkpoint: {0}", hybridLogToken);
// Ensure active state machine to null
currentSyncStateMachine = null;
// Recovery appropriate context information
var recoveredICInfo = new IndexCheckpointInfo();
@ -87,6 +141,14 @@ namespace FASTER.core
throw new FasterException("Cannot recover from (" + indexToken.ToString() + "," + hybridLogToken.ToString() + ") checkpoint pair!\n");
}
InternalRecover(recoveredICInfo, recoveredHLCInfo);
}
private void InternalRecover(IndexCheckpointInfo recoveredICInfo, HybridLogCheckpointInfo recoveredHLCInfo)
{
// Ensure active state machine to null
currentSyncStateMachine = null;
// Set new system state after recovery
var v = recoveredHLCInfo.info.version;
systemState.phase = Phase.REST;

Просмотреть файл

@ -11,19 +11,36 @@ namespace FASTER.core
/// </summary>
public class FasterException : Exception
{
internal FasterException()
/// <summary>
/// Throw FASTER exception
/// </summary>
public FasterException()
{
}
internal FasterException(string message) : base(message)
/// <summary>
/// Throw FASTER exception
/// </summary>
/// <param name="message"></param>
public FasterException(string message) : base(message)
{
}
internal FasterException(string message, Exception innerException) : base(message, innerException)
/// <summary>
/// Throw FASTER exception
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public FasterException(string message, Exception innerException) : base(message, innerException)
{
}
internal FasterException(SerializationInfo info, StreamingContext context) : base(info, context)
/// <summary>
/// Throw FASTER exception
/// </summary>
/// <param name="info"></param>
/// <param name="context"></param>
public FasterException(SerializationInfo info, StreamingContext context) : base(info, context)
{
}
}

Просмотреть файл

@ -5,11 +5,11 @@ using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.IO;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage.Blob;
using FASTER.core;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
namespace FASTER.devices
{
@ -19,66 +19,141 @@ namespace FASTER.devices
/// </summary>
public class AzureStorageDevice : StorageDeviceBase
{
private CloudBlobContainer container;
private readonly ConcurrentDictionary<int, BlobEntry> blobs;
private readonly CloudBlobDirectory blobDirectory;
private readonly string blobName;
private readonly bool deleteOnClose;
private readonly bool underLease;
internal BlobRequestOptions BlobRequestOptions { get; private set; }
// Page Blobs permit blobs of max size 8 TB, but the emulator permits only 2 GB
private const long MAX_BLOB_SIZE = (long)(2 * 10e8);
// Azure Page Blobs have a fixed sector size of 512 bytes.
private const uint PAGE_BLOB_SECTOR_SIZE = 512;
// Whether blob files are deleted on close
private readonly bool deleteOnClose;
/// <summary>
/// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
/// </summary>
/// <param name="cloudBlobDirectory">Cloud blob directory containing the blobs</param>
/// <param name="blobName">A descriptive name that will be the prefix of all blobs created with this device</param>
/// <param name="blobManager">Blob manager instance</param>
/// <param name="underLease">Whether we use leases</param>
/// <param name="deleteOnClose">
/// True if the program should delete all blobs created on call to <see cref="Close">Close</see>. False otherwise.
/// The container is not deleted even if it was created in this constructor
/// </param>
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
public AzureStorageDevice(CloudBlobDirectory cloudBlobDirectory, string blobName, IBlobManager blobManager = null, bool underLease = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED)
: base($"{cloudBlobDirectory}\\{blobName}", PAGE_BLOB_SECTOR_SIZE, capacity)
{
this.blobs = new ConcurrentDictionary<int, BlobEntry>();
this.blobDirectory = cloudBlobDirectory;
this.blobName = blobName;
this.underLease = underLease;
this.deleteOnClose = deleteOnClose;
this.BlobManager = blobManager ?? new DefaultBlobManager(this.underLease, this.blobDirectory);
this.BlobRequestOptions = BlobManager.GetBlobRequestOptions();
StartAsync().Wait();
}
/// <summary>
/// Constructs a new AzureStorageDevice instance, backed by Azure Page Blobs
/// </summary>
/// <param name="connectionString"> The connection string to use when estblishing connection to Azure Blobs</param>
/// <param name="containerName">Name of the Azure Blob container to use. If there does not exist a container with the supplied name, one is created</param>
/// <param name="directoryName">Directory within blob container to use.</param>
/// <param name="blobName">A descriptive name that will be the prefix of all blobs created with this device</param>
/// <param name="blobManager">Blob manager instance</param>
/// <param name="underLease">Whether we use leases</param>
/// <param name="deleteOnClose">
/// True if the program should delete all blobs created on call to <see cref="Close">Close</see>. False otherwise.
/// The container is not deleted even if it was created in this constructor
/// </param>
/// <param name="capacity">The maximum number of bytes this storage device can accommondate, or CAPACITY_UNSPECIFIED if there is no such limit </param>
public AzureStorageDevice(string connectionString, string containerName, string blobName, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED)
: base(connectionString + "/" + containerName + "/" + blobName, PAGE_BLOB_SECTOR_SIZE, capacity)
public AzureStorageDevice(string connectionString, string containerName, string directoryName, string blobName, IBlobManager blobManager = null, bool underLease = false, bool deleteOnClose = false, long capacity = Devices.CAPACITY_UNSPECIFIED)
: base($"{connectionString}\\{containerName}\\{directoryName}\\{blobName}", PAGE_BLOB_SECTOR_SIZE, capacity)
{
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(connectionString);
CloudBlobClient client = storageAccount.CreateCloudBlobClient();
container = client.GetContainerReference(containerName);
var storageAccount = CloudStorageAccount.Parse(connectionString);
var client = storageAccount.CreateCloudBlobClient();
var container = client.GetContainerReference(containerName);
container.CreateIfNotExists();
blobs = new ConcurrentDictionary<int, BlobEntry>();
this.blobs = new ConcurrentDictionary<int, BlobEntry>();
this.blobDirectory = container.GetDirectoryReference(directoryName);
this.blobName = blobName;
this.underLease = underLease;
this.deleteOnClose = deleteOnClose;
RecoverBlobs();
this.BlobManager = blobManager ?? new DefaultBlobManager(this.underLease, this.blobDirectory);
this.BlobRequestOptions = BlobManager.GetBlobRequestOptions();
StartAsync().Wait();
}
private void RecoverBlobs()
private async Task StartAsync()
{
// list all the blobs representing the segments
int prevSegmentId = -1;
foreach (IListBlobItem item in container.ListBlobs(blobName))
var prefix = $"{blobDirectory.Prefix}{blobName}.";
BlobContinuationToken continuationToken = null;
do
{
string[] parts = item.Uri.Segments;
int segmentId = Int32.Parse(parts[parts.Length - 1].Replace(blobName, ""));
if (segmentId != prevSegmentId + 1)
if (this.underLease)
{
startSegment = segmentId;
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}
else
var response = await this.blobDirectory.ListBlobsSegmentedAsync(useFlatBlobListing: false, blobListingDetails: BlobListingDetails.None, maxResults: 1000,
currentToken: continuationToken, options: this.BlobRequestOptions, operationContext: null).ConfigureAwait(false);
foreach (IListBlobItem item in response.Results)
{
endSegment = segmentId;
if (item is CloudPageBlob pageBlob)
{
if (Int32.TryParse(pageBlob.Name.Replace(prefix, ""), out int segmentId))
{
if (segmentId != prevSegmentId + 1)
{
startSegment = segmentId;
}
else
{
endSegment = segmentId;
}
prevSegmentId = segmentId;
}
}
}
prevSegmentId = segmentId;
continuationToken = response.ContinuationToken;
}
while (continuationToken != null);
for (int i = startSegment; i <= endSegment; i++)
{
bool ret = blobs.TryAdd(i, new BlobEntry(container.GetPageBlobReference(GetSegmentBlobName(i))));
Debug.Assert(ret, "Recovery of blobs is single-threaded and should not yield any failure due to concurrency");
bool ret = this.blobs.TryAdd(i, new BlobEntry(this.blobDirectory.GetPageBlobReference(GetSegmentBlobName(i)), this.BlobManager));
if (!ret)
{
throw new InvalidOperationException("Recovery of blobs is single-threaded and should not yield any failure due to concurrency");
}
}
}
/// <summary>
/// Is called on exceptions, if non-null; can be set by application
/// </summary>
private IBlobManager BlobManager { get; set; }
private string GetSegmentBlobName(int segmentId)
{
return $"{blobName}.{segmentId}";
}
/// <summary>
/// <see cref="IDevice.Close">Inherited</see>
/// </summary>
@ -86,13 +161,12 @@ namespace FASTER.devices
{
// Unlike in LocalStorageDevice, we explicitly remove all page blobs if the deleteOnClose flag is set, instead of relying on the operating system
// to delete files after the end of our process. This leads to potential problems if multiple instances are sharing the same underlying page blobs.
//
// Since this flag is presumably only used for testing though, it is probably fine.
// Since this flag is only used for testing, it is probably fine.
if (deleteOnClose)
{
foreach (var entry in blobs)
{
entry.Value.GetPageBlob().Delete();
entry.Value.PageBlob.Delete();
}
}
}
@ -105,54 +179,137 @@ namespace FASTER.devices
/// <param name="result"></param>
public override void RemoveSegmentAsync(int segment, AsyncCallback callback, IAsyncResult result)
{
if (blobs.TryRemove(segment, out BlobEntry blob))
if (this.blobs.TryRemove(segment, out BlobEntry blob))
{
CloudPageBlob pageBlob = blob.GetPageBlob();
pageBlob.BeginDelete(ar =>
{
try
{
pageBlob.EndDelete(ar);
CloudPageBlob pageBlob = blob.PageBlob;
}
catch (Exception)
{
// Can I do anything else other than printing out an error message?
}
callback(ar);
}, result);
if (this.underLease)
{
this.BlobManager.ConfirmLeaseAsync().GetAwaiter().GetResult();
}
if (!this.BlobManager.CancellationToken.IsCancellationRequested)
{
pageBlob.DeleteAsync(cancellationToken: this.BlobManager.CancellationToken)
.ContinueWith((Task t) =>
{
if (t.IsFaulted)
{
this.BlobManager?.HandleBlobError(nameof(RemoveSegmentAsync), "could not remove page blob for segment", pageBlob?.Name, t.Exception, false);
}
callback(result);
});
}
}
}
//---- The actual read and write accesses to the page blobs
private unsafe Task WritePortionToBlobUnsafeAsync(CloudPageBlob blob, IntPtr sourceAddress, long destinationAddress, long offset, uint length)
{
return this.WritePortionToBlobAsync(new UnmanagedMemoryStream((byte*)sourceAddress + offset, length), blob, sourceAddress, destinationAddress, offset, length);
}
private async Task WritePortionToBlobAsync(UnmanagedMemoryStream stream, CloudPageBlob blob, IntPtr sourceAddress, long destinationAddress, long offset, uint length)
{
try
{
if (this.underLease)
{
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
}
await blob.WritePagesAsync(stream, destinationAddress + offset,
contentChecksum: null, accessCondition: null, options: this.BlobRequestOptions, operationContext: null, cancellationToken: this.BlobManager.CancellationToken).ConfigureAwait(false);
}
catch (Exception exception)
{
this.BlobManager?.HandleBlobError(nameof(WritePortionToBlobAsync), "could not write to page blob", blob?.Name, exception, true);
throw;
}
finally
{
stream.Dispose();
}
}
private unsafe Task ReadFromBlobUnsafeAsync(CloudPageBlob blob, long sourceAddress, long destinationAddress, uint readLength)
{
return this.ReadFromBlobAsync(new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write), blob, sourceAddress, destinationAddress, readLength);
}
private async Task ReadFromBlobAsync(UnmanagedMemoryStream stream, CloudPageBlob blob, long sourceAddress, long destinationAddress, uint readLength)
{
Debug.WriteLine($"AzureStorageDevice.ReadFromBlobAsync Called target={blob.Name}");
try
{
if (this.underLease)
{
Debug.WriteLine($"confirm lease");
await this.BlobManager.ConfirmLeaseAsync().ConfigureAwait(false);
Debug.WriteLine($"confirm lease done");
}
Debug.WriteLine($"starting download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");
await blob.DownloadRangeToStreamAsync(stream, sourceAddress, readLength,
accessCondition: null, options: this.BlobRequestOptions, operationContext: null, cancellationToken: this.BlobManager.CancellationToken);
Debug.WriteLine($"finished download target={blob.Name} readLength={readLength} sourceAddress={sourceAddress}");
if (stream.Position != readLength)
{
throw new InvalidDataException($"wrong amount of data received from page blob, expected={readLength}, actual={stream.Position}");
}
}
catch (Exception exception)
{
this.BlobManager?.HandleBlobError(nameof(ReadFromBlobAsync), "could not read from page blob", blob?.Name, exception, true);
throw new FasterException(nameof(ReadFromBlobAsync) + "could not read from page blob " + blob?.Name, exception);
}
finally
{
stream.Dispose();
}
}
//---- the overridden methods represent the interface for a generic storage device
/// <summary>
/// <see cref="IDevice.ReadAsync(int, ulong, IntPtr, uint, IOCompletionCallback, IAsyncResult)">Inherited</see>
/// </summary>
public override unsafe void ReadAsync(int segmentId, ulong sourceAddress, IntPtr destinationAddress, uint readLength, IOCompletionCallback callback, IAsyncResult asyncResult)
{
Debug.WriteLine($"AzureStorageDevice.ReadAsync Called segmentId={segmentId} sourceAddress={sourceAddress} readLength={readLength}");
// It is up to the allocator to make sure no reads are issued to segments before they are written
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry)) throw new InvalidOperationException("Attempting to read non-existent segments");
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
var nonLoadedBlob = this.blobDirectory.GetPageBlobReference(GetSegmentBlobName(segmentId));
var exception = new InvalidOperationException("Attempt to read a non-loaded segment");
this.BlobManager?.HandleBlobError(nameof(ReadAsync), exception.Message, nonLoadedBlob?.Name, exception, true);
throw exception;
}
// Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)destinationAddress, readLength, readLength, FileAccess.Write);
CloudPageBlob pageBlob = blobEntry.GetPageBlob();
pageBlob.BeginDownloadRangeToStream(stream, (Int64)sourceAddress, readLength, ar => {
try
{
pageBlob.EndDownloadRangeToStream(ar);
}
// I don't think I can be more specific in catch here because no documentation on exception behavior is provided
catch (Exception e)
{
Trace.TraceError(e.Message);
// Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
// but does not distinguish between them.
callback(2, readLength, ovNative);
}
callback(0, readLength, ovNative);
}, asyncResult);
this.ReadFromBlobUnsafeAsync(blobEntry.PageBlob, (long)sourceAddress, (long)destinationAddress, readLength)
.ContinueWith((Task t) =>
{
if (t.IsFaulted)
{
Debug.WriteLine("AzureStorageDevice.ReadAsync Returned (Failure)");
callback(uint.MaxValue, readLength, ovNative);
}
else
{
Debug.WriteLine("AzureStorageDevice.ReadAsync Returned");
callback(0, readLength, ovNative);
}
});
}
/// <summary>
@ -160,66 +317,76 @@ namespace FASTER.devices
/// </summary>
public override void WriteAsync(IntPtr sourceAddress, int segmentId, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
Debug.WriteLine($"AzureStorageDevice.WriteAsync Called segmentId={segmentId} destinationAddress={destinationAddress} numBytesToWrite={numBytesToWrite}");
if (!blobs.TryGetValue(segmentId, out BlobEntry blobEntry))
{
BlobEntry entry = new BlobEntry();
BlobEntry entry = new BlobEntry(this.BlobManager);
if (blobs.TryAdd(segmentId, entry))
{
CloudPageBlob pageBlob = container.GetPageBlobReference(GetSegmentBlobName(segmentId));
CloudPageBlob pageBlob = this.blobDirectory.GetPageBlobReference(GetSegmentBlobName(segmentId));
// If segment size is -1, which denotes absence, we request the largest possible blob. This is okay because
// page blobs are not backed by real pages on creation, and the given size is only a the physical limit of
// how large it can grow to.
var size = segmentSize == -1 ? MAX_BLOB_SIZE : segmentSize;
// If no blob exists for the segment, we must first create the segment asynchronouly. (Create call takes ~70 ms by measurement)
// After creation is done, we can call write.
entry.CreateAsync(size, pageBlob);
var _ = entry.CreateAsync(size, pageBlob);
}
// Otherwise, some other thread beat us to it. Okay to use their blobs.
blobEntry = blobs[segmentId];
}
TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
this.TryWriteAsync(blobEntry, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void TryWriteAsync(BlobEntry blobEntry, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
CloudPageBlob pageBlob = blobEntry.GetPageBlob();
// If pageBlob is null, it is being created. Attempt to queue the write for the creator to complete after it is done
if (pageBlob == null
&& blobEntry.TryQueueAction(p => WriteToBlobAsync(p, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult))) return;
if (blobEntry.PageBlob == null
&& blobEntry.TryQueueAction(p => this.WriteToBlobAsync(p, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult)))
{
return;
}
// Otherwise, invoke directly.
WriteToBlobAsync(pageBlob, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
this.WriteToBlobAsync(blobEntry.PageBlob, sourceAddress, destinationAddress, numBytesToWrite, callback, asyncResult);
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
private unsafe void WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, ulong destinationAddress, uint numBytesToWrite, IOCompletionCallback callback, IAsyncResult asyncResult)
{
// Even though Azure Page Blob does not make use of Overlapped, we populate one to conform to the callback API
Overlapped ov = new Overlapped(0, 0, IntPtr.Zero, asyncResult);
NativeOverlapped* ovNative = ov.UnsafePack(callback, IntPtr.Zero);
UnmanagedMemoryStream stream = new UnmanagedMemoryStream((byte*)sourceAddress, numBytesToWrite);
blob.BeginWritePages(stream, (long)destinationAddress, null, ar =>
{
try
this.WriteToBlobAsync(blob, sourceAddress, (long)destinationAddress, numBytesToWrite)
.ContinueWith((Task t) =>
{
blob.EndWritePages(ar);
}
// I don't think I can be more specific in catch here because no documentation on exception behavior is provided
catch (Exception e)
{
Trace.TraceError(e.Message);
// Is there any documentation on the meaning of error codes here? The handler suggests that any non-zero value is an error
// but does not distinguish between them.
callback(1, numBytesToWrite, ovNative);
}
callback(0, numBytesToWrite, ovNative);
}, asyncResult);
if (t.IsFaulted)
{
Debug.WriteLine("AzureStorageDevice.WriteAsync Returned (Failure)");
callback(uint.MaxValue, numBytesToWrite, ovNative);
}
else
{
Debug.WriteLine("AzureStorageDevice.WriteAsync Returned");
callback(0, numBytesToWrite, ovNative);
}
});
}
private string GetSegmentBlobName(int segmentId)
const int maxPortionSizeForPageBlobWrites = 4 * 1024 * 1024; // 4 MB is a limit on page blob write portions, apparently
private async Task WriteToBlobAsync(CloudPageBlob blob, IntPtr sourceAddress, long destinationAddress, uint numBytesToWrite)
{
return blobName + segmentId;
long offset = 0;
while (numBytesToWrite > 0)
{
var length = Math.Min(numBytesToWrite, maxPortionSizeForPageBlobWrites);
await this.WritePortionToBlobUnsafeAsync(blob, sourceAddress, destinationAddress, offset, length).ConfigureAwait(false);
numBytesToWrite -= length;
offset += length;
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,110 @@
using FASTER.core;
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading.Tasks;
namespace FASTER.devices
{
/// <summary>
/// Device factory for Azure
/// </summary>
public class AzureStorageNamedDeviceFactory : INamedDeviceFactory
{
private readonly CloudBlobClient client;
private CloudBlobDirectory baseRef;
/// <summary>
/// Create instance of factory for Azure devices
/// </summary>
/// <param name="client"></param>
public AzureStorageNamedDeviceFactory(CloudBlobClient client)
{
this.client = client;
}
/// <summary>
/// Create instance of factory for Azure devices
/// </summary>
/// <param name="connectionString"></param>
public AzureStorageNamedDeviceFactory(string connectionString)
: this(CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient())
{
}
/// <inheritdoc />
public void Delete(FileDescriptor fileInfo)
{
if (fileInfo.fileName != null)
{
var dir = fileInfo.directoryName == "" ? baseRef : baseRef.GetDirectoryReference(fileInfo.directoryName);
// We only delete shard 0
dir.GetBlobReference(fileInfo.fileName + ".0").DeleteIfExists();
}
else
{
var dir = fileInfo.directoryName == "" ? baseRef : baseRef.GetDirectoryReference(fileInfo.directoryName);
foreach (IListBlobItem blob in dir.ListBlobs(true))
{
if (blob.GetType() == typeof(CloudBlob) || blob.GetType().BaseType == typeof(CloudBlob))
{
((CloudBlob)blob).DeleteIfExists();
}
}
}
}
/// <inheritdoc />
public IDevice Get(FileDescriptor fileInfo)
{
return new AzureStorageDevice(baseRef.GetDirectoryReference(fileInfo.directoryName), fileInfo.fileName);
}
/// <inheritdoc />
public void Initialize(string baseName)
{
var path = baseName.Split('/');
var containerName = path[0];
var dirName = string.Join("/", path.Skip(1));
var containerRef = client.GetContainerReference(containerName);
containerRef.CreateIfNotExists();
baseRef = containerRef.GetDirectoryReference(dirName);
}
/// <inheritdoc />
public IEnumerable<FileDescriptor> ListContents(string path)
{
foreach (var entry in baseRef.GetDirectoryReference(path).ListBlobs().Where(b => b as CloudBlobDirectory != null)
.OrderByDescending(f => GetLastModified((CloudBlobDirectory)f)))
{
yield return new FileDescriptor
{
directoryName = entry.Uri.LocalPath,
fileName = ""
};
}
foreach (var entry in baseRef.ListBlobs().Where(b => b as CloudPageBlob != null)
.OrderByDescending(f => ((CloudPageBlob)f).Properties.LastModified))
{
yield return new FileDescriptor
{
directoryName = "",
fileName = ((CloudPageBlob)entry).Name
};
}
}
private DateTimeOffset? GetLastModified(CloudBlobDirectory cloudBlobDirectory)
{
return cloudBlobDirectory.ListBlobs().Select(e => ((CloudPageBlob)e).Properties.LastModified).OrderByDescending(e => e).First();
}
}
}

Просмотреть файл

@ -4,7 +4,9 @@
using System;
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Storage.Blob;
namespace FASTER.devices
@ -17,64 +19,58 @@ namespace FASTER.devices
// In-progress creation is denoted by a null value on the underlying page blob
class BlobEntry
{
private CloudPageBlob pageBlob;
public CloudPageBlob PageBlob { get; private set; }
private ConcurrentQueue<Action<CloudPageBlob>> pendingWrites;
private readonly IBlobManager blobManager;
private int waitingCount;
/// <summary>
/// Creates a new BlobEntry to hold the given pageBlob. The pageBlob must already be created.
/// </summary>
/// <param name="pageBlob"></param>
public BlobEntry(CloudPageBlob pageBlob)
/// <param name="blobManager"></param>
public BlobEntry(CloudPageBlob pageBlob, IBlobManager blobManager)
{
this.pageBlob = pageBlob;
this.PageBlob = pageBlob;
this.blobManager = blobManager;
if (pageBlob == null)
{
// Only need to allocate a queue when we potentially need to asynchronously create a blob
pendingWrites = new ConcurrentQueue<Action<CloudPageBlob>>();
waitingCount = 0;
}
}
/// <summary>
/// Creates a new BlobEntry, does not initialize a page blob. Use <see cref="CreateAsync(long, CloudPageBlob)"/>
/// for actual creation.
/// </summary>
public BlobEntry() : this(null)
public BlobEntry(IBlobManager blobManager) : this(null, blobManager)
{
}
/// <summary>
/// Getter for the underlying <see cref="CloudPageBlob"/>
/// </summary>
/// <returns>the underlying <see cref="CloudPageBlob"/>, or null if there is none</returns>
public CloudPageBlob GetPageBlob()
{
return pageBlob;
}
/// <summary>
/// Asynchronously invoke create on the given pageBlob.
/// </summary>
/// <param name="size">maximum size of the blob</param>
/// <param name="pageBlob">The page blob to create</param>
public void CreateAsync(long size, CloudPageBlob pageBlob)
public async Task CreateAsync(long size, CloudPageBlob pageBlob)
{
Debug.Assert(waitingCount == 0, "Create should be called on blobs that don't already exist and exactly once");
// Asynchronously create the blob
pageBlob.BeginCreate(size, ar =>
try
{
try
if (this.waitingCount != 0)
{
pageBlob.EndCreate(ar);
}
catch (Exception e)
{
Trace.TraceError(e.Message);
this.blobManager.HandleBlobError(nameof(CreateAsync), "expect to be called on blobs that don't already exist and exactly once", pageBlob?.Name, null, false);
}
await pageBlob.CreateAsync(size,
accessCondition: null, options: this.blobManager.GetBlobRequestOptions(), operationContext: null, this.blobManager.CancellationToken);
// At this point the blob is fully created. After this line all consequent writers will write immediately. We just
// need to clear the queue of pending writers.
this.pageBlob = pageBlob;
this.PageBlob = pageBlob;
// Take a snapshot of the current waiting count. Exactly this many actions will be cleared.
// Swapping in -1 will inform any stragglers that we are not taking their actions and prompt them to retry (and call write directly)
int waitingCountSnapshot = Interlocked.Exchange(ref waitingCount, -1);
@ -87,9 +83,15 @@ namespace FASTER.devices
while (!pendingWrites.TryDequeue(out action)) { }
action(pageBlob);
}
// Mark for deallocation for the GC
pendingWrites = null;
}, null);
}
catch (Exception e)
{
this.blobManager.HandleBlobError(nameof(CreateAsync), "could not create page blob", pageBlob?.Name, e, true);
throw;
}
}
/// <summary>
@ -105,13 +107,16 @@ namespace FASTER.devices
do
{
currentCount = waitingCount;
// If current count became -1, creation is complete. New queue entries will not be processed and we must call the action ourselves.
if (currentCount == -1) return false;
} while (Interlocked.CompareExchange(ref waitingCount, currentCount + 1, currentCount) != currentCount);
// Enqueue last. The creation thread is obliged to wait until it has processed waitingCount many actions.
// It is extremely unlikely that we will get scheduled out here anyways.
pendingWrites.Enqueue(writeAction);
return true;
}
}
}
}

Просмотреть файл

@ -0,0 +1,318 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using Microsoft.Azure.Storage;
using Microsoft.Azure.Storage.Blob;
using Microsoft.Azure.Storage.Blob.Protocol;
using Microsoft.Azure.Storage.RetryPolicies;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace FASTER.devices
{
/// <summary>
/// Default blob manager with lease support
/// </summary>
public class DefaultBlobManager : IBlobManager
{
private readonly CancellationTokenSource cts;
private readonly bool underLease;
private string leaseId;
private readonly TimeSpan LeaseDuration = TimeSpan.FromSeconds(45); // max time the lease stays after unclean shutdown
private readonly TimeSpan LeaseRenewal = TimeSpan.FromSeconds(30); // how often we renew the lease
private readonly TimeSpan LeaseSafetyBuffer = TimeSpan.FromSeconds(10); // how much time we want left on the lease before issuing a protected access
private volatile Stopwatch leaseTimer;
private Task LeaseMaintenanceLoopTask = Task.CompletedTask;
private volatile Task NextLeaseRenewalTask = Task.CompletedTask;
private readonly CloudBlobDirectory leaseDirectory;
private const string LeaseBlobName = "lease";
private CloudBlockBlob leaseBlob;
/// <summary>
/// Create instance of blob manager
/// </summary>
/// <param name="underLease">Should we use blob leases</param>
/// <param name="leaseDirectory">Directory to store lease file</param>
public DefaultBlobManager(bool underLease, CloudBlobDirectory leaseDirectory = null)
{
this.underLease = underLease;
this.leaseDirectory = leaseDirectory;
this.cts = new CancellationTokenSource();
if (underLease)
{
// Start lease maintenance loop
var _ = StartAsync();
}
}
/// <summary>
/// Start blob manager and acquire lease
/// </summary>
/// <returns></returns>
private async Task StartAsync()
{
this.leaseBlob = this.leaseDirectory.GetBlockBlobReference(LeaseBlobName);
await this.AcquireOwnership().ConfigureAwait(false);
}
/// <summary>
/// Clean shutdown, wait for everything, then terminate
/// </summary>
/// <returns></returns>
public async Task StopAsync()
{
this.cts.Cancel(); // has no effect if already cancelled
await this.LeaseMaintenanceLoopTask.ConfigureAwait(false); // wait for loop to terminate cleanly
}
/// <inheritdoc />
public CancellationToken CancellationToken => cts.Token;
/// <inheritdoc />
public async ValueTask ConfirmLeaseAsync()
{
if (!underLease)
return;
while (true)
{
if (this.leaseTimer?.Elapsed < this.LeaseDuration - this.LeaseSafetyBuffer)
return;
Debug.WriteLine("Access is waiting for fresh lease");
await this.NextLeaseRenewalTask;
}
}
/// <inheritdoc />
public BlobRequestOptions GetBlobRequestOptions()
{
if (underLease)
{
return new BlobRequestOptions()
{
RetryPolicy = new LinearRetry(TimeSpan.FromSeconds(2), 2),
NetworkTimeout = TimeSpan.FromSeconds(50),
};
}
else
{
return new BlobRequestOptions()
{
RetryPolicy = new ExponentialRetry(TimeSpan.FromSeconds(4), 4),
NetworkTimeout = TimeSpan.FromSeconds(50),
};
}
}
/// <inheritdoc />
public void HandleBlobError(string where, string message, string blobName, Exception e, bool isFatal)
{
HandleError(where, $"Encountered storage exception for blob {blobName}", e, isFatal);
}
private async Task AcquireOwnership()
{
var newLeaseTimer = new Stopwatch();
while (true)
{
CancellationToken.ThrowIfCancellationRequested();
try
{
newLeaseTimer.Restart();
this.leaseId = await this.leaseBlob.AcquireLeaseAsync(LeaseDuration, null,
accessCondition: null, options: this.GetBlobRequestOptions(), operationContext: null, this.CancellationToken).ConfigureAwait(false);
this.leaseTimer = newLeaseTimer;
this.LeaseMaintenanceLoopTask = Task.Run(() => this.MaintenanceLoopAsync());
return;
}
catch (StorageException ex) when (LeaseConflictOrExpired(ex))
{
Debug.WriteLine("Waiting for lease");
// the previous owner has not released the lease yet,
// try again until it becomes available, should be relatively soon
// as the transport layer is supposed to shut down the previous owner when starting this
await Task.Delay(TimeSpan.FromSeconds(1), this.CancellationToken).ConfigureAwait(false);
continue;
}
catch (StorageException ex) when (BlobDoesNotExist(ex))
{
try
{
// Create blob with empty content, then try again
Debug.WriteLine("Creating commit blob");
await this.leaseBlob.UploadFromByteArrayAsync(Array.Empty<byte>(), 0, 0).ConfigureAwait(false);
continue;
}
catch (StorageException ex2) when (LeaseConflictOrExpired(ex2))
{
// creation race, try from top
Debug.WriteLine("Creation race observed, retrying");
continue;
}
}
catch (Exception e)
{
HandleError(nameof(AcquireOwnership), "Could not acquire lease", e, true);
throw;
}
}
}
private void HandleError(string context, string message, Exception exception, bool terminate)
{
Debug.WriteLine(context + ": " + message + ", " + exception.ToString());
// terminate in response to the error
if (terminate && !cts.IsCancellationRequested)
{
Terminate();
}
}
private async Task RenewLeaseTask()
{
try
{
await Task.Delay(this.LeaseRenewal, this.CancellationToken).ConfigureAwait(false);
AccessCondition acc = new AccessCondition() { LeaseId = this.leaseId };
var nextLeaseTimer = new Stopwatch();
nextLeaseTimer.Start();
await this.leaseBlob.RenewLeaseAsync(acc, this.CancellationToken).ConfigureAwait(false);
this.leaseTimer = nextLeaseTimer;
}
catch (Exception)
{
Debug.WriteLine("Failed to renew lease");
throw;
}
}
private async Task MaintenanceLoopAsync()
{
try
{
while (true)
{
// save the task so storage accesses can wait for it
this.NextLeaseRenewalTask = this.RenewLeaseTask();
// wait for successful renewal, or exit the loop as this throws
await this.NextLeaseRenewalTask.ConfigureAwait(false);
}
}
catch (OperationCanceledException)
{
// it's o.k. to cancel while waiting
Debug.WriteLine("Lease renewal loop cleanly canceled");
}
catch (StorageException e) when (e.InnerException != null && e.InnerException is OperationCanceledException)
{
// it's o.k. to cancel a lease renewal
Debug.WriteLine("Lease renewal storage operation canceled");
}
catch (StorageException ex) when (LeaseConflict(ex))
{
// We lost the lease to someone else. Terminate ownership immediately.
HandleError(nameof(MaintenanceLoopAsync), "Lost lease", ex, true);
}
catch (Exception e) when (!IsFatal(e))
{
HandleError(nameof(MaintenanceLoopAsync), "Could not maintain lease", e, true);
}
Debug.WriteLine("Exited lease maintenance loop");
if (this.CancellationToken.IsCancellationRequested)
{
// this is an unclean shutdown, so we let the lease expire to protect straggling storage accesses
Debug.WriteLine("Leaving lease to expire on its own");
}
else
{
try
{
Debug.WriteLine("Releasing lease");
AccessCondition acc = new AccessCondition() { LeaseId = this.leaseId };
await this.leaseBlob.ReleaseLeaseAsync(accessCondition: acc,
options: this.GetBlobRequestOptions(), operationContext: null, cancellationToken: this.CancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
// it's o.k. if termination is triggered while waiting
}
catch (StorageException e) when (e.InnerException != null && e.InnerException is OperationCanceledException)
{
// it's o.k. if termination is triggered while we are releasing the lease
}
catch (Exception)
{
Debug.WriteLine("could not release lease for " + this.leaseBlob.Name);
// swallow exceptions when releasing a lease
}
}
Terminate();
Debug.WriteLine("Blob manager stopped");
}
private void Terminate()
{
try
{
cts.Cancel();
}
catch (AggregateException aggregate)
{
foreach (var e in aggregate.InnerExceptions)
{
HandleError("Terminate", "Encountered exeption while canceling token", e, false);
}
}
catch (Exception e)
{
HandleError("Terminate", "Encountered exeption while canceling token", e, false);
}
}
private static bool LeaseConflict(StorageException e)
{
return (e.RequestInformation.HttpStatusCode == 409);
}
private static bool LeaseConflictOrExpired(StorageException e)
{
return (e.RequestInformation.HttpStatusCode == 409) || (e.RequestInformation.HttpStatusCode == 412);
}
private static bool BlobDoesNotExist(StorageException e)
{
var information = e.RequestInformation.ExtendedErrorInformation;
return (e.RequestInformation.HttpStatusCode == 404) && (information.ErrorCode.Equals(BlobErrorCodeStrings.BlobNotFound));
}
private static bool IsFatal(Exception exception)
{
if (exception is OutOfMemoryException || exception is StackOverflowException)
{
return true;
}
return false;
}
}
}

Просмотреть файл

@ -37,7 +37,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.1.3" />
<PackageReference Include="Microsoft.Azure.Storage.Blob" Version="11.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\core\FASTER.core.csproj" />

Просмотреть файл

@ -16,15 +16,15 @@
<tags>key-value store dictionary hashtable concurrent log persistent azure storage FASTER</tags>
<dependencies>
<group targetFramework="net461">
<dependency id="Microsoft.Azure.Storage.Blob" version="11.1.3" />
<dependency id="Microsoft.Azure.Storage.Blob" version="11.2.0" />
<dependency id="Microsoft.FASTER" version="$version$" />
</group>
<group targetFramework="netstandard2.0">
<dependency id="Microsoft.Azure.Storage.Blob" version="11.1.3" />
<dependency id="Microsoft.Azure.Storage.Blob" version="11.2.0" />
<dependency id="Microsoft.FASTER" version="$version$" />
</group>
<group targetFramework="netstandard2.1">
<dependency id="Microsoft.Azure.Storage.Blob" version="11.1.3" />
<dependency id="Microsoft.Azure.Storage.Blob" version="11.2.0" />
<dependency id="Microsoft.FASTER" version="$version$" />
</group>
</dependencies>

Просмотреть файл

@ -0,0 +1,43 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using Microsoft.Azure.Storage.Blob;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace FASTER.devices
{
/// <summary>
/// Manager for blobs, can be shared across devices.
/// </summary>
public interface IBlobManager
{
/// <summary>
/// Get blob request options
/// </summary>
/// <returns></returns>
BlobRequestOptions GetBlobRequestOptions();
/// <summary>
/// Cancellation token for blob operations
/// </summary>
CancellationToken CancellationToken { get; }
/// <summary>
/// Error handler for blob operations
/// </summary>
/// <param name="where"></param>
/// <param name="message"></param>
/// <param name="blobName"></param>
/// <param name="e"></param>
/// <param name="isFatal"></param>
void HandleBlobError(string where, string message, string blobName, Exception e, bool isFatal);
/// <summary>
/// Confirm lease ownership
/// </summary>
/// <returns></returns>
ValueTask ConfirmLeaseAsync();
}
}

Просмотреть файл

@ -32,7 +32,7 @@ namespace FASTER.test
public void PageBlobWriteRead()
{
if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
TestDeviceWriteRead(new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false));
TestDeviceWriteRead(new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "PageBlobWriteRead", "BasicDiskFASTERTests"));
}
[Test]
@ -42,7 +42,7 @@ namespace FASTER.test
IDevice localDevice = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\BasicDiskFASTERTests.log", deleteOnClose: true, capacity: 1 << 30);
if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
{
IDevice cloudDevice = new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "BasicDiskFASTERTests", false);
IDevice cloudDevice = new AzureStorageDevice(EMULATED_STORAGE_STRING, TEST_CONTAINER, "TieredWriteRead", "BasicDiskFASTERTests");
tested = new TieredStorageDevice(1, localDevice, cloudDevice);
}
else

Просмотреть файл

@ -0,0 +1,75 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.
using System;
using System.IO;
using System.Linq;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
using FASTER.devices;
using NUnit.Framework;
namespace FASTER.test
{
[TestFixture]
internal class DeviceFasterLogTests
{
const int entryLength = 100;
const int numEntries = 100000;
private FasterLog log;
public const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;";
public const string TEST_CONTAINER = "test";
private IDevice device;
private string commitPath;
[Test]
public void PageBlobFasterLogTest1([Values] LogChecksumType logChecksum)
{
if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
{
var device = new AzureStorageDevice(EMULATED_STORAGE_STRING, $"{TEST_CONTAINER}", "PageBlobFasterLogTest1", "fasterlog.log", deleteOnClose: true);
var checkpointManager = new DeviceLogCommitCheckpointManager(
new AzureStorageNamedDeviceFactory(EMULATED_STORAGE_STRING),
new DefaultCheckpointNamingScheme($"{TEST_CONTAINER}/PageBlobFasterLogTest1"));
FasterLogTest1(logChecksum, device, checkpointManager);
device.Close();
checkpointManager.PurgeAll();
checkpointManager.Dispose();
}
}
private void FasterLogTest1(LogChecksumType logChecksum, IDevice device, ILogCommitManager logCommitManager)
{
log = new FasterLog(new FasterLogSettings { PageSizeBits = 20, SegmentSizeBits = 20, LogDevice = device, LogChecksum = logChecksum, LogCommitManager = logCommitManager });
byte[] entry = new byte[entryLength];
for (int i = 0; i < entryLength; i++)
entry[i] = (byte)i;
for (int i = 0; i < numEntries; i++)
{
log.Enqueue(entry);
}
log.Commit(true);
using (var iter = log.Scan(0, long.MaxValue))
{
int count = 0;
while (iter.GetNext(out byte[] result, out int length, out long currentAddress))
{
count++;
Assert.IsTrue(result.SequenceEqual(entry));
if (count % 100 == 0)
log.TruncateUntil(iter.NextAddress);
}
Assert.IsTrue(count == numEntries);
}
log.Dispose();
}
}
}

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.IO;
using System.Linq;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;
@ -19,12 +20,21 @@ namespace FASTER.test
const int numEntries = 1000000;
private FasterLog log;
private IDevice device;
private string commitPath;
[SetUp]
public void Setup()
{
commitPath = new DefaultCheckpointNamingScheme().FasterLogCommitBasePath();
if (commitPath == "")
throw new Exception("Write log commits to separate folder for testing");
commitPath = TestContext.CurrentContext.TestDirectory + "\\" + commitPath;
if (Directory.Exists(commitPath))
DeleteDirectory(commitPath);
if (File.Exists(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit"))
File.Delete(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit");
device = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log", deleteOnClose: true);
}
@ -32,6 +42,8 @@ namespace FASTER.test
public void TearDown()
{
device.Close();
if (Directory.Exists(commitPath))
DeleteDirectory(commitPath);
if (File.Exists(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit"))
File.Delete(TestContext.CurrentContext.TestDirectory + "\\fasterlog.log.commit");
}
@ -243,5 +255,81 @@ namespace FASTER.test
}
}
}
[Test]
public async Task ResumePersistedReader2([Values] LogChecksumType logChecksum, [Values] bool overwriteLogCommits, [Values] bool removeOutdated)
{
var input1 = new byte[] { 0, 1, 2, 3 };
var input2 = new byte[] { 4, 5, 6, 7, 8, 9, 10 };
var input3 = new byte[] { 11, 12 };
string readerName = "abc";
var commitPath = TestContext.CurrentContext.TestDirectory + "\\ResumePersistedReader2";
if (Directory.Exists(commitPath))
DeleteDirectory(commitPath);
using (var logCommitManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(commitPath), overwriteLogCommits, removeOutdated))
{
long originalCompleted;
using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
await l.EnqueueAsync(input1);
await l.CommitAsync();
await l.EnqueueAsync(input2);
await l.CommitAsync();
await l.EnqueueAsync(input3);
await l.CommitAsync();
long recoveryAddress;
using (var originalIterator = l.Scan(0, long.MaxValue, readerName))
{
originalIterator.GetNext(out _, out _, out _, out recoveryAddress);
originalIterator.CompleteUntil(recoveryAddress);
originalIterator.GetNext(out _, out _, out _, out _); // move the reader ahead
await l.CommitAsync();
originalCompleted = originalIterator.CompletedUntilAddress;
}
}
using (var l = new FasterLog(new FasterLogSettings { LogDevice = device, PageSizeBits = 16, MemorySizeBits = 16, LogChecksum = logChecksum, LogCommitManager = logCommitManager }))
{
using (var recoveredIterator = l.Scan(0, long.MaxValue, readerName))
{
recoveredIterator.GetNext(out byte[] outBuf, out _, out _, out _);
// we should have read in input2, not input1 or input3
Assert.True(input2.SequenceEqual(outBuf), $"Original: {input2[0]}, Recovered: {outBuf[0]}, Original: {originalCompleted}, Recovered: {recoveredIterator.CompletedUntilAddress}");
// TestContext.Progress.WriteLine($"Original: {originalCompleted}, Recovered: {recoveredIterator.CompletedUntilAddress}");
}
}
}
DeleteDirectory(commitPath);
}
private static void DeleteDirectory(string path)
{
foreach (string directory in Directory.GetDirectories(path))
{
DeleteDirectory(directory);
}
try
{
Directory.Delete(path, true);
}
catch (IOException)
{
Directory.Delete(path, true);
}
catch (UnauthorizedAccessException)
{
Directory.Delete(path, true);
}
}
}
}

Просмотреть файл

@ -180,7 +180,11 @@ namespace FASTER.test.recovery.objectstore
// Test outputs
var checkpointInfo = default(HybridLogRecoveryInfo);
checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path));
checkpointInfo.Recover(cprVersion,
new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(test_path).FullName)));
// Compute expected array
long[] expected = new long[numUniqueKeys];

Просмотреть файл

@ -217,7 +217,11 @@ namespace FASTER.test.recovery.sumstore
// Test outputs
var checkpointInfo = default(HybridLogRecoveryInfo);
checkpointInfo.Recover(cprVersion, new LocalCheckpointManager(test_path));
checkpointInfo.Recover(cprVersion,
new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(test_path).FullName)));
// Compute expected array
long[] expected = new long[numUniqueKeys];

Просмотреть файл

@ -169,7 +169,11 @@ namespace FASTER.test.recovery.sumstore
private void Test(FasterTestInstance fasterInstance, Guid checkpointToken)
{
var checkpointInfo = default(HybridLogRecoveryInfo);
checkpointInfo.Recover(checkpointToken, new LocalCheckpointManager(fasterInstance.CheckpointDirectory));
checkpointInfo.Recover(checkpointToken,
new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme(
new DirectoryInfo(fasterInstance.CheckpointDirectory).FullName)));
// Create array for reading
var inputArray = new AdInput[numUniqueKeys];

Просмотреть файл

@ -10,6 +10,7 @@ using System.Linq;
using FASTER.core;
using System.IO;
using NUnit.Framework;
using FASTER.devices;
namespace FASTER.test.recovery.sumstore.simple
{
@ -20,27 +21,61 @@ namespace FASTER.test.recovery.sumstore.simple
private FasterKV<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions> fht1;
private FasterKV<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions> fht2;
private IDevice log;
public const string EMULATED_STORAGE_STRING = "UseDevelopmentStorage=true;";
public const string TEST_CONTAINER = "checkpoints4";
[TestCase(CheckpointType.FoldOver)]
[TestCase(CheckpointType.Snapshot)]
public void SimpleRecoveryTest1(CheckpointType checkpointType)
public void PageBlobSimpleRecoveryTest(CheckpointType checkpointType)
{
log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\SimpleRecoveryTest1.log", deleteOnClose: true);
if ("yes".Equals(Environment.GetEnvironmentVariable("RunAzureTests")))
{
ICheckpointManager checkpointManager = new DeviceLogCommitCheckpointManager(
new AzureStorageNamedDeviceFactory(EMULATED_STORAGE_STRING),
new DefaultCheckpointNamingScheme($"{TEST_CONTAINER}/PageBlobSimpleRecoveryTest"));
SimpleRecoveryTest1(checkpointType, checkpointManager);
checkpointManager.PurgeAll();
checkpointManager.Dispose();
}
}
Directory.CreateDirectory(TestContext.CurrentContext.TestDirectory + "\\checkpoints4");
[TestCase(CheckpointType.FoldOver)]
[TestCase(CheckpointType.Snapshot)]
public void LocalDeviceSimpleRecoveryTest(CheckpointType checkpointType)
{
ICheckpointManager checkpointManager = new DeviceLogCommitCheckpointManager(
new LocalStorageNamedDeviceFactory(),
new DefaultCheckpointNamingScheme($"{TEST_CONTAINER}/PageBlobSimpleRecoveryTest"));
SimpleRecoveryTest1(checkpointType, checkpointManager);
checkpointManager.PurgeAll();
checkpointManager.Dispose();
}
[TestCase(CheckpointType.FoldOver)]
[TestCase(CheckpointType.Snapshot)]
public void SimpleRecoveryTest1(CheckpointType checkpointType, ICheckpointManager checkpointManager = null)
{
string checkpointDir = TestContext.CurrentContext.TestDirectory + $"\\{TEST_CONTAINER}";
if (checkpointManager != null)
checkpointDir = null;
log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\SimpleRecoveryTest1.log", deleteOnClose: true);
fht1 = new FasterKV
<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions>
(128, new SimpleFunctions(),
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints4", CheckPointType = checkpointType }
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir, CheckpointManager = checkpointManager, CheckPointType = checkpointType }
);
fht2 = new FasterKV
<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions>
(128, new SimpleFunctions(),
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointDir = TestContext.CurrentContext.TestDirectory + "\\checkpoints4", CheckPointType = checkpointType }
checkpointSettings: new CheckpointSettings { CheckpointDir = checkpointDir, CheckpointManager = checkpointManager, CheckPointType = checkpointType }
);
@ -84,6 +119,78 @@ namespace FASTER.test.recovery.sumstore.simple
log.Close();
fht1.Dispose();
fht2.Dispose();
if (checkpointManager == null)
new DirectoryInfo(checkpointDir).Delete(true);
}
[TestCase(CheckpointType.FoldOver)]
[TestCase(CheckpointType.Snapshot)]
public void SimpleRecoveryTest2(CheckpointType checkpointType)
{
var checkpointManager = new DeviceLogCommitCheckpointManager(new LocalStorageNamedDeviceFactory(), new DefaultCheckpointNamingScheme(TestContext.CurrentContext.TestDirectory + "\\checkpoints4"), false);
log = Devices.CreateLogDevice(TestContext.CurrentContext.TestDirectory + "\\SimpleRecoveryTest2.log", deleteOnClose: true);
// Directory.CreateDirectory(TestContext.CurrentContext.TestDirectory + "\\checkpoints4");
fht1 = new FasterKV
<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions>
(128, new SimpleFunctions(),
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager, CheckPointType = checkpointType }
);
fht2 = new FasterKV
<AdId, NumClicks, AdInput, Output, Empty, SimpleFunctions>
(128, new SimpleFunctions(),
logSettings: new LogSettings { LogDevice = log, MutableFraction = 0.1, MemorySizeBits = 29 },
checkpointSettings: new CheckpointSettings { CheckpointManager = checkpointManager, CheckPointType = checkpointType }
);
int numOps = 5000;
var inputArray = new AdId[numOps];
for (int i = 0; i < numOps; i++)
{
inputArray[i].adId = i;
}
NumClicks value;
AdInput inputArg = default;
Output output = default;
var session1 = fht1.NewSession();
for (int key = 0; key < numOps; key++)
{
value.numClicks = key;
session1.Upsert(ref inputArray[key], ref value, Empty.Default, 0);
}
fht1.TakeFullCheckpoint(out Guid token);
fht1.CompleteCheckpointAsync().GetAwaiter().GetResult();
session1.Dispose();
fht2.Recover(token);
var session2 = fht2.NewSession();
for (int key = 0; key < numOps; key++)
{
var status = session2.Read(ref inputArray[key], ref inputArg, ref output, Empty.Default, 0);
if (status == Status.PENDING)
session2.CompletePending(true);
else
{
Assert.IsTrue(output.value.numClicks == key);
}
}
session2.Dispose();
log.Close();
fht1.Dispose();
fht2.Dispose();
checkpointManager.Dispose();
new DirectoryInfo(TestContext.CurrentContext.TestDirectory + "\\checkpoints4").Delete(true);
}