Merged PR 695621: Add span-based serialization/deserialization to LLS

This PR adds an option to use span-based serialization/deserialization logic for LLS for code unification and performance reasons.

The PR adds a flag to switch from the old to the new logic and the test that shows the difference.
The new version is 3-x faster than the old version.

More specific data:

Old version: WriteDuration: 9444ms, ReadDuration: 9696ms
New version: WriteDuration: 3831ms, ReadDuration: 3119ms

Related work items: #2014041
This commit is contained in:
Sergey Tepliakov 2023-03-02 20:07:19 +00:00
Родитель f9b93b841b
Коммит 989eebf706
31 изменённых файлов: 1518 добавлений и 143 удалений

Просмотреть файл

@ -8,10 +8,10 @@ using System.Linq;
using System.Text;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using BuildXL.Utilities.Core;
using BuildXL.Utilities.Collections;
using BuildXL.Utilities.Serialization;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
{
@ -85,17 +85,15 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
/// <nodoc />
public static ContentLocationEventData Deserialize(BuildXLReader reader, DateTime eventTimeUtc)
{
Contract.Requires(reader != null);
var kind = (EventKind)reader.ReadByte();
var sender = MachineId.Deserialize(reader);
var hashes = reader.ReadReadOnlyList(r => r.ReadShortHash());
var hashes = reader.ReadArray(static r => r.ReadShortHash());
switch (kind)
{
case EventKind.AddLocation:
case EventKind.AddLocationWithoutTouching:
return new AddContentLocationEventData(sender, hashes, reader.ReadReadOnlyList(r => r.ReadInt64Compact()), touch: kind == EventKind.AddLocation);
return new AddContentLocationEventData(sender, hashes, reader.ReadArray(static r => r.ReadInt64Compact()), touch: kind == EventKind.AddLocation);
case EventKind.RemoveLocation:
return new RemoveContentLocationEventData(sender, hashes);
case EventKind.Touch:
@ -103,22 +101,68 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
case EventKind.Blob:
return new BlobContentLocationEventData(sender, reader.ReadString());
case EventKind.UpdateMetadataEntry:
return new UpdateMetadataEntryEventData(sender, reader);
return UpdateMetadataEntryEventData.Deserialize(reader, sender);
default:
throw new ArgumentOutOfRangeException($"Unknown event kind '{kind}'.");
}
}
/// <nodoc />
public static ContentLocationEventData Deserialize(ref SpanReader reader, DateTime eventTimeUtc)
{
var kind = (EventKind)reader.ReadByte();
var sender = MachineId.Deserialize(ref reader);
var hashes = reader.ReadArray(static (ref SpanReader r) => r.ReadShortHash());
switch (kind)
{
case EventKind.AddLocation:
case EventKind.AddLocationWithoutTouching:
return new AddContentLocationEventData(sender, hashes, reader.ReadArray(static (ref SpanReader r) => r.ReadInt64Compact()), touch: kind == EventKind.AddLocation);
case EventKind.RemoveLocation:
return new RemoveContentLocationEventData(sender, hashes);
case EventKind.Touch:
return new TouchContentLocationEventData(sender, hashes, eventTimeUtc);
case EventKind.Blob:
return new BlobContentLocationEventData(sender, reader.ReadString());
case EventKind.UpdateMetadataEntry:
return UpdateMetadataEntryEventData.Deserialize(ref reader, sender);
default:
throw new ArgumentOutOfRangeException($"Unknown event kind '{kind}'.");
}
}
/// <nodoc />
public virtual void Serialize(ref SpanWriter writer)
{
writer.Write((byte)SerializationKind);
Sender.Serialize(ref writer);
writer.Write(ContentHashes, static (ref SpanWriter w, ShortHash hash) => w.Write(hash));
switch (this) {
case AddContentLocationEventData addContentLocationEventData:
writer.Write(addContentLocationEventData.ContentSizes, static (ref SpanWriter w, long size) => w.WriteCompact(size));
break;
case RemoveContentLocationEventData removeContentLocationEventData:
case TouchContentLocationEventData touchContentLocationEventData:
// Do nothing. No extra data. Touch timestamp is taken from event enqueue time
break;
case BlobContentLocationEventData reconcileContentLocationEventData:
writer.Write(reconcileContentLocationEventData.BlobId);
break;
}
}
/// <nodoc />
public virtual void Serialize(BuildXLWriter writer)
{
writer.Write((byte)SerializationKind);
Sender.Serialize(writer);
writer.WriteReadOnlyList(ContentHashes, (w, hash) => w.Write(hash));
writer.WriteReadOnlyList(ContentHashes, static (w, hash) => w.Write(hash));
switch (this) {
case AddContentLocationEventData addContentLocationEventData:
writer.WriteReadOnlyList(addContentLocationEventData.ContentSizes, (w, size) => w.WriteCompact(size));
writer.WriteReadOnlyList(addContentLocationEventData.ContentSizes, static (w, size) => w.WriteCompact(size));
break;
case RemoveContentLocationEventData removeContentLocationEventData:
case TouchContentLocationEventData touchContentLocationEventData:
@ -172,6 +216,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
case TouchContentLocationEventData touchContentLocationEventData:
result.AddRange(hashes.Select(t => new TouchContentLocationEventData(Sender, t, touchContentLocationEventData.AccessTime)));
break;
case UpdateMetadataEntryEventData update:
result.AddRange(hashes.Select(t => new UpdateMetadataEntryEventData(Sender, update.StrongFingerprint, update.Entry)));
break;
}
foreach (var r in result)
@ -200,7 +247,22 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
/// <inheritdoc />
public override bool Equals(object obj)
{
return EqualityComparer<ContentLocationEventData>.Default.Equals(this, obj as ContentLocationEventData);
if (ReferenceEquals(this, obj))
{
return true;
}
if (obj is null)
{
return false;
}
if (GetType() != obj.GetType())
{
return false;
}
return Equals((ContentLocationEventData)obj);
}
/// <inheritdoc />
@ -339,11 +401,27 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
/// <nodoc />
public UpdateMetadataEntryEventData(MachineId sender, BuildXLReader reader)
: base(EventKind.UpdateMetadataEntry, sender, CollectionUtilities.EmptyArray<ShortHash>())
public static UpdateMetadataEntryEventData Deserialize(ref SpanReader reader, MachineId sender)
{
StrongFingerprint = StrongFingerprint.Deserialize(reader);
Entry = MetadataEntry.Deserialize(reader);
var strongFingerprint = StrongFingerprint.Deserialize(ref reader);
var entry = MetadataEntry.Deserialize(ref reader);
return new UpdateMetadataEntryEventData(sender, strongFingerprint, entry);
}
/// <nodoc />
public static UpdateMetadataEntryEventData Deserialize(BuildXLReader reader, MachineId sender)
{
var strongFingerprint = StrongFingerprint.Deserialize(reader);
var entry = MetadataEntry.Deserialize(reader);
return new UpdateMetadataEntryEventData(sender, strongFingerprint, entry);
}
/// <inheritdoc />
public override void Serialize(ref SpanWriter writer)
{
base.Serialize(ref writer);
StrongFingerprint.Serialize(ref writer);
Entry.Serialize(ref writer);
}
/// <inheritdoc />

Просмотреть файл

@ -4,14 +4,22 @@
using System;
using System.Collections.Generic;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Hashing.FileSystemHelpers;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Tracing;
using BuildXL.Cache.ContentStore.Tracing.Internal;
using BuildXL.Utilities;
using BuildXL.Utilities.Serialization;
using BuildXL.Utilities.Core;
using BuildXL.Utilities.Core.Tasks;
using Microsoft.Azure.EventHubs;
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
{
@ -36,11 +44,32 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
Fail,
}
/// <summary>
/// Defines the way the data is serialized.
/// </summary>
/// <remarks>
/// Once the new mode is tested this enum will be removed.
/// </remarks>
public enum SerializationMode
{
/// <summary>
/// BxlReader/Writer-based serialization/deserialization
/// </summary>
Legacy,
/// <summary>
/// Span-based serialization deserialization.
/// </summary>
SpanBased,
}
/// <summary>
/// Helper class used for serialization/deserialization of <see cref="ContentLocationEventData"/> instances.
/// </summary>
public sealed class ContentLocationEventDataSerializer
{
private readonly IAbsFileSystem _fileSystem;
private readonly SerializationMode _serializationMode;
private readonly ValidationMode _validationMode;
private const string Prefix = nameof(ContentLocationEventDataSerializer);
private static readonly Tracer Tracer = new Tracer(Prefix);
@ -54,27 +83,171 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
private readonly StreamBinaryWriter _writer = new StreamBinaryWriter();
private readonly StreamBinaryReader _reader = new StreamBinaryReader();
private readonly ObjectPool<BxlArrayBufferWriter<byte>> _arrayBufferWriters = new(() => new BxlArrayBufferWriter<byte>(), bw => bw.Clear());
private readonly bool _synchronize;
/// <nodoc />
public ContentLocationEventDataSerializer(ValidationMode validationMode, bool synchronize = false)
public ContentLocationEventDataSerializer(IAbsFileSystem fileSystem, SerializationMode serializationMode, ValidationMode validationMode, bool synchronize = false)
{
_fileSystem = fileSystem;
_serializationMode = serializationMode;
_validationMode = validationMode;
_synchronize = synchronize;
}
/// <summary>
/// Saves the <paramref name="messages"/> into <paramref name="path"/>.
/// </summary>
public async Task<long> SaveToFileAsync(OperationContext context, AbsolutePath path, IReadOnlyList<ContentLocationEventData> messages, int bufferSizeHint = -1)
{
// Not deleting the file in this method in case of serialization exception.
// The caller of this method does it instead.
using FileStream stream = _fileSystem.Open(
path,
FileAccess.ReadWrite,
FileMode.Create,
FileShare.Read | FileShare.Delete,
FileOptions.None,
AbsFileSystemExtension.DefaultFileStreamBufferSize).ToFileStream();
if (_serializationMode == SerializationMode.Legacy)
{
using var writer = BuildXLWriter.Create(stream, leaveOpen: true);
SerializeEvents(writer, messages);
return stream.Position;
}
return await SaveToFileWithSpanAsync(context, stream, messages, bufferSizeHint);
}
/// <summary>
/// Serialize a given <paramref name="message"/> into a pooled byte[] instance.
/// </summary>
/// <remarks>
/// If the <paramref name="sizeHint"/> is provided than that value will be used to get the size of the pooled byte array.
/// Otherwise the size will be computed based on the estimated message size.
/// This is done for testing purposes, because we want to make sure that the serialization works even with a small original
/// buffer size and the implementation will try the bigger buffers to fit the given message.
/// </remarks>
internal static (PooledObjectWrapper<byte[]> buffer, int writtenLength) SerializeMessage(OperationContext context, ContentLocationEventData message, int sizeHint = -1)
{
sizeHint = sizeHint <= 0 ? (int)(message.EstimateSerializedInstanceSize() * 2) : sizeHint;
while (true)
{
try
{
var bufferHandle = Pools.GetByteArray(sizeHint);
var spanWriter = bufferHandle.Instance.AsSpan().AsWriter();
message.Serialize(ref spanWriter);
return (bufferHandle, spanWriter.WrittenBytes.Length);
}
catch (InsufficientLengthException e)
{
Tracer.Warning(context, $"Getting a bigger buffer to accomodate the messages size because the size '{sizeHint}' is insufficient. MinLength={e.MinLength}.");
// We still might have more iterations because MinLength can still be small since its computed during writing a particular piece of data
// and does not reflect the overall size that will be written to eventually.
sizeHint = Math.Max(sizeHint, e.MinLength) * 10;
}
}
}
private async Task<long> SaveToFileWithSpanAsync(
OperationContext context,
FileStream stream,
IReadOnlyList<ContentLocationEventData> messages,
int bufferSizeHint)
{
long writtenLength = writeMessageCount(stream, messages.Count);
foreach (var message in messages)
{
var (bufferHandle, serializedLength) = SerializeMessage(context, message, bufferSizeHint);
using (bufferHandle)
{
var buffer = bufferHandle.Instance;
await stream.WriteAsync(buffer, offset: 0, count: serializedLength, context.Token);
writtenLength += serializedLength;
}
}
return writtenLength;
static long writeMessageCount(FileStream file, int messageCount)
{
using var bufferHandle = Pools.GetByteArray(minimumCapacity: sizeof(int));
var buffer = bufferHandle.Instance;
var spanWriter = buffer.AsSpan().AsWriter();
spanWriter.WriteCompact(messageCount);
int count = spanWriter.WrittenBytes.Length;
// Using an API that takes byte[] and not Span<byte> because the span-based API is only available in .net core.
#pragma warning disable AsyncFixer02 // this is a synchronous method
file.Write(buffer, offset: 0, count: count);
#pragma warning restore AsyncFixer02
return count;
}
}
/// <summary>
/// Gets the events from <paramref name="path"/>.
/// </summary>
public IReadOnlyList<ContentLocationEventData> LoadFromFile(OperationContext context, AbsolutePath path, bool deleteOnClose = true)
{
using var stream = _fileSystem.Open(
path,
FileAccess.Read,
FileMode.Open,
FileShare.Read | FileShare.Delete,
deleteOnClose ? FileOptions.DeleteOnClose : FileOptions.None,
AbsFileSystemExtension.DefaultFileStreamBufferSize).ToFileStream();
if (_serializationMode == SerializationMode.Legacy)
{
using var reader = BuildXLReader.Create(stream, leaveOpen: true);
// Calling ToList to force materialization of IEnumerable to avoid access of disposed stream.
return DeserializeEvents(reader).ToList();
}
using var handle = MemoryMappedFileHandle.CreateReadOnly(stream, leaveOpen: true);
return DeserializeEvents(handle.Content);
}
/// <nodoc />
public IReadOnlyList<EventData> Serialize(OperationContext context, IReadOnlyList<ContentLocationEventData> eventDatas)
public IReadOnlyList<EventData> Serialize(OperationContext context, IReadOnlyList<ContentLocationEventData> messages)
{
return _serializationMode == SerializationMode.Legacy ? SerializeLegacy(context, messages) : SerializeWithSpan(context, messages);
}
private IReadOnlyList<EventData> SerializeLegacy(OperationContext context, IReadOnlyList<ContentLocationEventData> eventDatas)
{
return SynchronizeIfNeeded(
_ =>
{
var result = SerializeCore(context, eventDatas).ToList();
var result = SerializeLegacyCore(context, eventDatas).ToList();
if (_validationMode != ValidationMode.Off)
{
var deserializedEvents = result.SelectMany(e => DeserializeEvents(e, DateTime.Now)).ToList();
var deserializedEvents = result.SelectMany(e => DeserializeEventsLegacy(e, DateTime.Now)).ToList();
AnalyzeEquality(context, eventDatas, deserializedEvents);
}
return result;
});
}
private IReadOnlyList<EventData> SerializeWithSpan(OperationContext context, IReadOnlyList<ContentLocationEventData> eventDatas)
{
return SynchronizeIfNeeded(
_ =>
{
var result = SerializeWithSpanCore(context, eventDatas).ToList();
if (_validationMode != ValidationMode.Off)
{
var deserializedEvents = result.SelectMany(e => DeserializeEventsWithSpan(e, DateTime.Now)).ToList();
AnalyzeEquality(context, eventDatas, deserializedEvents);
}
@ -174,22 +347,22 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
}
private void AnalyzeEquality(OperationContext context, IReadOnlyList<ContentLocationEventData> originalEventDatas, IReadOnlyList<ContentLocationEventData> deserializedEvents)
private void AnalyzeEquality(OperationContext context, IReadOnlyList<ContentLocationEventData> originalMessages, IReadOnlyList<ContentLocationEventData> deserializedMessages)
{
// EventData entries may be split due to the size restriction.
if (!Equal(originalEventDatas, deserializedEvents))
if (!Equal(originalMessages, deserializedMessages))
{
StringBuilder builder = new StringBuilder();
var header = $"{Prefix}: Serialization equality mismatch detected.";
builder.AppendLine($"{header} Emitting original event information:");
foreach (var eventData in originalEventDatas)
foreach (var eventData in originalMessages)
{
builder.AppendLine($"{eventData.Kind}[{eventData.ContentHashes.Count}]({GetTraceInfo(eventData)})");
}
builder.AppendLine($"Deserialized event information:");
foreach (var eventData in deserializedEvents)
foreach (var eventData in deserializedMessages)
{
builder.AppendLine($"{eventData.Kind}[{eventData.ContentHashes.Count}]({GetTraceInfo(eventData)})");
}
@ -212,14 +385,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
}
private IEnumerable<EventData> SerializeCore(OperationContext context, IReadOnlyList<ContentLocationEventData> eventDatas)
private IEnumerable<EventData> SerializeLegacyCore(OperationContext context, IReadOnlyList<ContentLocationEventData> messages)
{
// TODO: Maybe a serialization format header. (bug 1365340)
var splitLargeInstancesIfNeeded = SplitLargeInstancesIfNeeded(context, eventDatas);
var splitLargeInstancesIfNeeded = SplitLargeInstancesIfNeeded(context, messages);
if (splitLargeInstancesIfNeeded.Count != eventDatas.Count)
if (splitLargeInstancesIfNeeded.Count != messages.Count)
{
context.TracingContext.Debug($"Split {eventDatas.Count} to {splitLargeInstancesIfNeeded.Count} because of size restrictions.", component: nameof(ContentLocationEventDataSerializer));
context.TracingContext.Debug($"Split {messages.Count} to {splitLargeInstancesIfNeeded.Count} because of size restrictions.", component: nameof(ContentLocationEventDataSerializer));
}
using (_writer.PreservePosition())
@ -266,14 +438,71 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
}
private IEnumerable<EventData> SerializeWithSpanCore(OperationContext context, IReadOnlyList<ContentLocationEventData> messages)
{
var splitLargeInstancesIfNeeded = SplitLargeInstancesIfNeeded(context, messages);
if (splitLargeInstancesIfNeeded.Count != messages.Count)
{
context.TracingContext.Debug($"Split {messages.Count} to {splitLargeInstancesIfNeeded.Count} because of size restrictions.", component: nameof(ContentLocationEventDataSerializer));
}
using var handler = _arrayBufferWriters.GetInstance();
var arrayBufferWriter = handler.Instance;
int currentCount = 0;
while (currentCount < splitLargeInstancesIfNeeded.Count)
{
int oldOffset = arrayBufferWriter.WrittenCount;
var spanWriter = new SpanWriter(arrayBufferWriter);
splitLargeInstancesIfNeeded[currentCount].Serialize(ref spanWriter);
int newOffset = arrayBufferWriter.WrittenCount;
int eventSize = newOffset - oldOffset;
Contract.Assert(
eventSize <= MaxEventDataPayloadSize,
$"No mitigation for single {splitLargeInstancesIfNeeded[currentCount].Kind} event that is too large");
bool isLast = currentCount == (splitLargeInstancesIfNeeded.Count - 1);
bool isOverflow = newOffset > MaxEventDataPayloadSize;
if (isOverflow || isLast)
{
if (isOverflow)
{
// Need to change the offset for the overflow case, but not if the last element is serialized.
arrayBufferWriter.SetPosition(oldOffset);
}
var eventData = new EventData(arrayBufferWriter.WrittenSpan.ToArray());
// We don't need to clear the buffer, just set the position to the beginning.
arrayBufferWriter.SetPosition(0);
yield return eventData;
if (!isOverflow)
{
// Need to break because we don't increment current count
yield break;
}
}
else
{
currentCount++;
}
}
}
/// <nodoc />
public void SerializeEvents(BuildXLWriter writer, IReadOnlyList<ContentLocationEventData> eventDatas)
public void SerializeEvents(BuildXLWriter writer, IReadOnlyList<ContentLocationEventData> messages)
{
SynchronizeIfNeeded(
_ =>
{
writer.WriteCompact(eventDatas.Count);
foreach (var eventData in eventDatas)
writer.WriteCompact(messages.Count);
foreach (var eventData in messages)
{
eventData.Serialize(writer);
}
@ -283,22 +512,54 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
/// <nodoc />
public IEnumerable<ContentLocationEventData> DeserializeEvents(BuildXLReader reader)
public void SerializeEvents(ref SpanWriter writer, IReadOnlyList<ContentLocationEventData> messages)
{
// Not using helpers because we can't capture 'writer' in a delegate.
if (_synchronize)
{
Monitor.Enter(this);
}
try
{
writer.WriteCompact(messages.Count);
foreach (var eventData in messages)
{
eventData.Serialize(ref writer);
}
}
finally
{
if (_synchronize)
{
Monitor.Exit(this);
}
}
}
private List<ContentLocationEventData> DeserializeEvents(ReadOnlySpan<byte> content)
{
return SynchronizeIfNeeded(
synchronized =>
content,
static content =>
{
// Need to "materialize" the result if synchronization is needed.
// Otherwise the lock will be released before all the data is consumed from the
if (synchronized)
var reader = content.AsReader();
var entriesCount = reader.ReadInt32Compact();
var result = new List<ContentLocationEventData>();
for (int i = 0; i < entriesCount; i++)
{
return deserializeEventsCore().ToList();
}
else
{
return deserializeEventsCore();
// Using default as eventTimeUtc because reconciliation events should not have touches.
result.Add(ContentLocationEventData.Deserialize(ref reader, eventTimeUtc: default));
}
return result;
});
}
private List<ContentLocationEventData> DeserializeEvents(BuildXLReader reader)
{
return SynchronizeIfNeeded(_ => deserializeEventsCore().ToList());
IEnumerable<ContentLocationEventData> deserializeEventsCore()
{
@ -314,6 +575,14 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
/// <nodoc />
public IReadOnlyList<ContentLocationEventData> DeserializeEvents(EventData message, DateTime? eventTimeUtc = null)
{
return _serializationMode == SerializationMode.Legacy
? DeserializeEvents(message, eventTimeUtc)
: DeserializeEventsWithSpan(message, eventTimeUtc);
}
/// <nodoc />
public IReadOnlyList<ContentLocationEventData> DeserializeEventsLegacy(EventData message, DateTime? eventTimeUtc = null)
{
return SynchronizeIfNeeded(
_ =>
@ -329,6 +598,28 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
});
}
private IReadOnlyList<ContentLocationEventData> DeserializeEventsWithSpan(EventData message, DateTime? eventTimeUtc = null)
{
return SynchronizeIfNeeded(
_ =>
{
if (eventTimeUtc == null)
{
Contract.Assert(message.SystemProperties != null, "Either eventTimeUtc argument must be provided or message.SystemProperties must not be null. Did you forget to provide eventTimeUtc arguments in tests?");
eventTimeUtc = message.SystemProperties.EnqueuedTimeUtc;
}
var dataReader = message.Body.AsSpan().AsReader();
var result = new List<ContentLocationEventData>();
while (!dataReader.IsEnd)
{
result.Add(ContentLocationEventData.Deserialize(ref dataReader, eventTimeUtc.Value));
}
return result;
});
}
/// <nodoc />
public static IReadOnlyList<ContentLocationEventData> SplitLargeInstancesIfNeeded(OperationContext context, IReadOnlyList<ContentLocationEventData> source)
{
@ -376,5 +667,22 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
return func(false);
}
}
private delegate T ParseData<T>(ReadOnlySpan<byte> content);
private T SynchronizeIfNeeded<T>(ReadOnlySpan<byte> data, ParseData<T> func)
{
if (_synchronize)
{
lock (this)
{
return func(data);
}
}
else
{
return func(data);
}
}
}
}

Просмотреть файл

@ -10,6 +10,7 @@ using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.FileSystem;
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Cache.ContentStore.Hashing.FileSystemHelpers;
using BuildXL.Cache.ContentStore.Interfaces.Extensions;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
@ -63,7 +64,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
private readonly CentralStorage _storage;
private readonly Interfaces.FileSystem.AbsolutePath _workingDirectory;
private readonly IAbsFileSystem _fileSystem;
protected readonly IAbsFileSystem FileSystem;
private readonly DisposableDirectory _workingDisposableDirectory;
protected readonly TimeSpan?[] EventQueueDelays;
@ -89,9 +90,9 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
IClock clock)
{
_configuration = configuration;
_fileSystem = new PassThroughFileSystem();
FileSystem = new PassThroughFileSystem();
_storage = centralStorage;
_workingDisposableDirectory = new DisposableDirectory(_fileSystem, workingDirectory);
_workingDisposableDirectory = new DisposableDirectory(FileSystem, workingDirectory);
_workingDirectory = workingDirectory;
EventQueueDelays = new TimeSpan?[configuration.MaxEventProcessingConcurrency];
EventHandler = eventHandler;
@ -100,7 +101,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
Tracer = tracer;
ValidationMode validationMode = configuration.SelfCheckSerialization ? (configuration.SelfCheckSerializationShouldFail ? ValidationMode.Fail : ValidationMode.Trace) : ValidationMode.Off;
SerializationMode serializationMode = configuration.UseSpanBasedSerialization ? SerializationMode.SpanBased : SerializationMode.Legacy;
// EventDataSerializer is not thread-safe.
// This is usually not a problem, because the nagle queue that is used by this class
// kind of guarantees that it would be just a single thread responsible for sending the events
@ -110,7 +111,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
// In this case this method can be called from multiple threads causing serialization/deserialization issues.
// So to prevent random test failures because of the state corruption we're using lock
// if the batch size is 1.
EventDataSerializer = new ContentLocationEventDataSerializer(validationMode, synchronize: _configuration.EventBatchSize == 1);
EventDataSerializer = new ContentLocationEventDataSerializer(FileSystem, serializationMode, validationMode, synchronize: _configuration.EventBatchSize == 1);
}
/// <summary>
@ -247,18 +248,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
var blobName = blobEvent.BlobId;
await _storage.TryGetFileAsync(context, blobName, blobFilePath).ThrowIfFailure();
using var stream = _fileSystem.Open(
blobFilePath,
FileAccess.Read,
FileMode.Open,
FileShare.Read | FileShare.Delete,
FileOptions.DeleteOnClose,
AbsFileSystemExtension.DefaultFileStreamBufferSize);
using var reader = BuildXLReader.Create(stream, leaveOpen: true);
// Calling ToList to force materialization of IEnumerable to avoid access of disposed stream.
return EventDataSerializer.DeserializeEvents(reader).ToList();
return EventDataSerializer.LoadFromFile(context, blobFilePath);
}
}
@ -477,13 +467,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
try
{
long size = 0;
using (Stream stream = _fileSystem.Open(blobFilePath, FileAccess.ReadWrite, FileMode.Create, FileShare.Read | FileShare.Delete, FileOptions.None, AbsFileSystemExtension.DefaultFileStreamBufferSize))
using (var writer = BuildXLWriter.Create(stream, leaveOpen: true))
{
EventDataSerializer.SerializeEvents(writer, eventDatas);
size = stream.Position;
}
long size = await EventDataSerializer.SaveToFileAsync(context, blobFilePath, eventDatas);
// Uploading the checkpoint
var storageIdResult = await _storage.UploadFileAsync(context, blobFilePath, blobName).ThrowIfFailure();
@ -495,7 +479,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
}
finally
{
_fileSystem.DeleteFile(blobFilePath);
// Safely deleting the file to avoid masking an original exception in case the try block fails.
FileSystem.TryDeleteFile(context.TracingContext, blobFilePath);
}
},
Counters[PublishLargeEvent],

Просмотреть файл

@ -44,6 +44,11 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
/// </summary>
public bool SelfCheckSerialization { get; set; } = false;
/// <summary>
/// If true then the new span-based serialization/deserialization mode is used, otherwise the old BuildXLReader/Writer-based approach is used.
/// </summary>
public bool UseSpanBasedSerialization { get; set; } = true;
/// <summary>
/// If enabled, self-check serialization failures will trigger an error instead of just getting traced.
/// </summary>

Просмотреть файл

@ -11,6 +11,7 @@ using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Distributed.Tracing;
using BuildXL.Cache.ContentStore.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.Tracing;
@ -89,7 +90,13 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
.Select(
(_, index) =>
{
var serializer = new ContentLocationEventDataSerializer(configuration.SelfCheckSerialization ? (configuration.SelfCheckSerializationShouldFail ? ValidationMode.Fail : ValidationMode.Trace) : ValidationMode.Off);
ValidationMode validationMode = configuration.SelfCheckSerialization
? (configuration.SelfCheckSerializationShouldFail ? ValidationMode.Fail : ValidationMode.Trace)
: ValidationMode.Off;
SerializationMode serializationMode = configuration.UseSpanBasedSerialization
? SerializationMode.SpanBased
: SerializationMode.Legacy;
var serializer = new ContentLocationEventDataSerializer(FileSystem, serializationMode, validationMode);
return ActionBlockSlim.CreateWithAsyncAction<ProcessEventsInput>(
new ActionBlockSlimConfiguration(
DegreeOfParallelism: 1,
@ -123,7 +130,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache.EventStreaming
/// <inheritdoc />
protected override async Task<BoolResult> StartupCoreAsync(OperationContext context)
{
Tracer.Info(context, $"Initializing Event Hub-based content location event store with epoch '{_configuration.Epoch}'.");
Tracer.Info(context, $"Initializing Event Hub-based content location event store with epoch '{_configuration.Epoch}', UseSpanBasedSerialization={_configuration.UseSpanBasedSerialization}.");
var baseInitializeResult = await base.StartupCoreAsync(context);
if (!baseInitializeResult)

Просмотреть файл

@ -454,17 +454,17 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
if (_latestAccessTime != DefaultInvalidLatestAccessTime || _size >= 0)
{
writer.WriteUInt32Compact(_latestAccessTime);
writer.WriteCompact(_latestAccessTime);
bool mustSerializationEarliestTime = _earliestAccessTime != DefaultInvalidEarliestTime && _earliestAccessTime != _latestAccessTime;
var finalSize = Math.Max(_size, -1) + 1;
if (finalSize > 0 || mustSerializationEarliestTime)
{
writer.WriteInt64Compact(finalSize);
writer.WriteCompact(finalSize);
if (mustSerializationEarliestTime)
{
writer.WriteUInt32Compact(_earliestAccessTime);
writer.WriteCompact(_earliestAccessTime);
}
}
}

Просмотреть файл

@ -4,6 +4,8 @@
using System;
using System.Diagnostics.ContractsLight;
using System.IO;
using BuildXL.Utilities;
using BuildXL.Utilities.Serialization;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
@ -60,12 +62,25 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
return new MachineId(index);
}
/// <nodoc />
public static MachineId Deserialize(ref SpanReader reader)
{
var index = reader.ReadInt32();
return new MachineId(index);
}
/// <nodoc />
public bool IsValid()
{
return !Equals(Invalid);
}
/// <nodoc />
public void Serialize(ref SpanWriter writer)
{
writer.Write(Index);
}
/// <nodoc />
public void Serialize(BinaryWriter writer)
{

Просмотреть файл

@ -48,7 +48,7 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
}
/// <nodoc />
public static long DeserializeLastAccessTimeUtc(BuildXLReader reader)
public static long DeserializeLastAccessTimeUtc(ref SpanReader reader)
{
return reader.ReadInt64Compact();
}
@ -59,5 +59,12 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
writer.WriteCompact(LastAccessTimeUtc.ToFileTimeUtc());
ContentHashListWithDeterminism.Serialize(writer);
}
/// <nodoc />
public void Serialize(ref SpanWriter writer)
{
writer.WriteCompact(LastAccessTimeUtc.ToFileTimeUtc());
ContentHashListWithDeterminism.Serialize(ref writer);
}
}
}

Просмотреть файл

@ -1058,7 +1058,8 @@ namespace BuildXL.Cache.ContentStore.Distributed.NuCache
private long DeserializeMetadataLastAccessTimeUtc(ReadOnlySpan<byte> data)
{
return SerializationPool.Deserialize(data, static reader => MetadataEntry.DeserializeLastAccessTimeUtc(reader));
var reader = data.AsReader();
return MetadataEntry.DeserializeLastAccessTimeUtc(ref reader);
}
/// <inheritdoc />

Просмотреть файл

@ -0,0 +1,34 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Utilities.Serialization;
namespace BuildXL.Cache.ContentStore.Distributed.NuCache
{
/// <nodoc />
public static class ShortHashExtensions
{
/// <nodoc />
public static void Write(this ref SpanWriter writer, in ShortHash value)
{
writer.Write<ShortHash>(value);
}
/// <nodoc />
public static ShortHash ReadShortHash(this ref SpanReader reader)
{
var span = reader.ReadSpan(ShortHash.SerializedLength, allowIncomplete: true);
return ShortHash.FromSpan(span);
}
/// <nodoc />
public static void Write(this ref SpanWriter writer, in ContentHash value)
{
writer.EnsureLength(ContentHash.MaxHashByteLength);
var writtenBytes = value.Serialize(writer.Remaining);
writer.Advance(writtenBytes);
}
}
}

Просмотреть файл

@ -6,7 +6,6 @@ using System.Buffers;
using System.Linq;
using BuildXL.Cache.ContentStore.Distributed.NuCache;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Utilities.Serialization;
using FluentAssertions;
using Xunit;
using Xunit.Abstractions;

Просмотреть файл

@ -16,18 +16,20 @@ using Xunit;
using Xunit.Abstractions;
using System.Diagnostics.ContractsLight;
using System.Collections.Generic;
using BuildXL.Cache.ContentStore.Distributed.Utilities;
using System.Diagnostics;
using BuildXL.Cache.MemoizationStore.Interfaces.Sessions;
using BuildXL.Cache.ContentStore.Tracing;
using System.Threading.Tasks;
using BuildXL.Cache.ContentStore.Interfaces.Results;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Time;
using BuildXL.Cache.ContentStore.UtilitiesCore;
using System.IO;
using System.Threading;
using BuildXL.Cache.ContentStore.InterfacesTest.Results;
using BuildXL.Cache.ContentStore.Utils;
using BuildXL.Utilities.Serialization;
using Microsoft.Azure.EventHubs;
using AbsolutePath = BuildXL.Cache.ContentStore.Interfaces.FileSystem.AbsolutePath;
namespace ContentStoreTest.Distributed.ContentLocation.NuCache
{
@ -39,6 +41,90 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
{
}
[Fact]
public async Task SerializeWithSpansAndSmallInitialFileSize()
{
var context = OperationContext();
var path = TestRootDirectoryPath / "Tmp.txt";
var messages = new[] { GenerateRandomEventData(0, numberOfHashes: 50_000, DateTime.Now) };
var oldSerializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization: true);
var serializedSize = await oldSerializer.SaveToFileAsync(context, path, messages, bufferSizeHint: 1);
serializedSize.Should().BeGreaterThan(1); // This means that we had to re-opened the memory-mapped file during serialization.
}
[Fact]
public async Task TaskTestFileSerializationDeserializationBackwardCompatibility()
{
const int largeEventContentCount = 0;
var context = OperationContext();
var path = TestRootDirectoryPath / "Tmp.txt";
var messages = new []
{
// GenerateRandomEventData(0, numberOfHashes: largeEventContentCount, DateTime.Now),
// Not using touches because the touch time is not serialized.
// GenerateRandomEventData(1, numberOfHashes: largeEventContentCount, DateTime.Now),
GenerateRandomEventData(2, numberOfHashes: largeEventContentCount, DateTime.Now),
//GenerateRandomEventData(3, numberOfHashes: largeEventContentCount, DateTime.Now),
}.ToList();
var legacySerializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization: false);
var newSerializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization: true);
var legacyFileSize = await legacySerializer.SaveToFileAsync(context, path, messages);
var deserialized = legacySerializer.LoadFromFile(context, path, deleteOnClose: false);
// Checking the old deserialization
// Saving to the file doesn't split the large events.
deserialized.Count.Should().Be(messages.Count);
deserialized.Should().BeEquivalentTo(messages);
// The new deserialization.
deserialized = newSerializer.LoadFromFile(context, path, deleteOnClose: false);
deserialized.Should().BeEquivalentTo(messages);
// The new serialization + deserialization
var newFileSize = await newSerializer.SaveToFileAsync(context, path, messages);
newFileSize.Should().Be(legacyFileSize);
deserialized = newSerializer.LoadFromFile(context, path, deleteOnClose: false);
deserialized.Should().BeEquivalentTo(messages);
}
[Theory(Skip = "For manual testing only")]
[InlineData(true)]
[InlineData(false)]
public async Task BenchmarkSerialization(bool useSpanBasedSerialization)
{
var context = OperationContext();
var path = TestRootDirectoryPath / "Tmp.txt";
var messages = new[] { GenerateRandomEventData(0, numberOfHashes: 50_000, DateTime.Now) };
var serializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization, ValidationMode.Off);
// Warming up the check.
await serializer.SaveToFileAsync(context, path, messages);
var sw = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++)
{
await serializer.SaveToFileAsync(context, TestRootDirectoryPath / $"{i}.txt", messages);
}
var writeDuration = sw.ElapsedMilliseconds;
sw = Stopwatch.StartNew();
for (int i = 0; i < 1000; i++)
{
var pathToRead = TestRootDirectoryPath / $"{i}.txt";
serializer.LoadFromFile(context, pathToRead);
}
// Failing to show the message more easily.
true.Should().BeFalse($"Mode: {(useSpanBasedSerialization ? "Span-based" : "Legacy")}, WriteDuration: {writeDuration}ms, ReadDuration: {sw.ElapsedMilliseconds}ms");
}
public static IEnumerable<object[]> EventKinds => Enum.GetValues(typeof(EventKind)).OfType<EventKind>().Select(k => new object[] { k, k.ToString() });
[Theory]
@ -73,18 +159,14 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
await sendAndVerifyLargeEvent(kind);
using var stream = new MemoryStream();
using var writer = BuildXL.Utilities.Core.BuildXLWriter.Create(stream);
var arrayBuffer = serializeIntoBuffer(serializer, harness.Events);
serializer.SerializeEvents(writer, harness.Events);
stream.Position.Should().BeGreaterThan(ContentLocationEventDataSerializer.MaxEventDataPayloadSize,
arrayBuffer.WrittenCount.Should().BeGreaterThan(ContentLocationEventDataSerializer.MaxEventDataPayloadSize,
"Event should be larger than max event payload size to properly test serialization logic");
bool canSplit = kind == EventKind.AddLocation
|| kind == EventKind.AddLocationWithoutTouching
|| kind == EventKind.RemoveLocation
|| kind == EventKind.RemoveLocation
|| kind == EventKind.Touch;
foreach (var eventData in harness.Events)
@ -117,13 +199,21 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
harness.State.DownloadedCount.Should().Be(1);
harness.State.UploadedCount.Should().Be(1);
harness.State.UploadedSize.Should().BeGreaterOrEqualTo(stream.Position);
harness.State.UploadedSize.Should().BeGreaterOrEqualTo(arrayBuffer.WrittenCount);
harness.State.UploadedSize.Should().Be(harness.State.DownloadedSize);
}
static BxlArrayBufferWriter<byte> serializeIntoBuffer(ContentLocationEventDataSerializer serializer, List<ContentLocationEventData> events)
{
var arrayBuffer = new BxlArrayBufferWriter<byte>();
var writer = new SpanWriter(arrayBuffer);
serializer.SerializeEvents(ref writer, events);
return arrayBuffer;
}
async Task sendAndVerifyLargeEvent(EventKind kind)
{
const int largeEventContentCount = 50000;
const int largeEventContentCount = 50_000;
switch (kind)
{
@ -231,7 +321,7 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
var serializer = CreateContentLocationEventDataSerializer();
var largeMessage = GenerateRandomEventData(0, numberOfHashesPerItem, touchTime);
var serializedMessages = serializer.Serialize(OperationContext(), new[] { largeMessage }).ToList();
var serializedMessages = Serialize(serializer, new[] { largeMessage });
// Round trip validation is performed by the serializer
Output.WriteLine($"Number of serialized records: {serializedMessages.Count}");
@ -239,13 +329,13 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
}
[Fact]
public void TwoHunderdEventsShouldBeSerializedIntoOneEventDAta()
public void TwoHunderdEventsShouldBeSerializedIntoOneEventData()
{
DateTime touchTime = DateTime.UtcNow;
var serializer = CreateContentLocationEventDataSerializer();
var largeMessage = Enumerable.Range(1, 200).Select<int, ContentLocationEventData>(n => GenerateRandomEventData(0, numberOfHashes: 2, touchTime: touchTime)).ToArray();
var serializedMessages = serializer.Serialize(OperationContext(), largeMessage).ToList();
var serializedMessages = Serialize(serializer, largeMessage);
// Round trip validation is performed by the serializer
Output.WriteLine($"Number of serialized records: {serializedMessages.Count}");
@ -259,11 +349,11 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
const int numberOfHashesPerItem = 200;
DateTime touchTime = DateTime.UtcNow;
var serializer = CreateContentLocationEventDataSerializer();
var serializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization: false);
var messages = Enumerable.Range(1, numberOfItems).Select<int, ContentLocationEventData>(n => GenerateRandomEventData(n, numberOfHashesPerItem, touchTime)).ToArray();
// Round trip validation is performed by the serializer
var serializedMessages = serializer.Serialize(OperationContext(), messages).ToList();
var serializedMessages = Serialize(serializer, messages);
Output.WriteLine($"Number of serialized records: {serializedMessages.Count}");
serializedMessages.Count.Should().NotBe(1);
@ -280,23 +370,109 @@ namespace ContentStoreTest.Distributed.ContentLocation.NuCache
var messages = Enumerable.Range(1, numberOfItems).Select<int, ContentLocationEventData>(n => GenerateRandomEventData(n, numberOfHashesPerItem, touchTime)).ToArray();
// Round trip validation is performed by the serializer
var serializedMessages = serializer.Serialize(OperationContext(), messages).ToList();
var serializedMessages = Serialize(serializer, messages);
serializedMessages.Count.Should().Be(1); // All the cases we have should fit into one message.
}
[Fact]
public void SerializationRoundtripWithTouch()
{
DateTime touchTime = DateTime.UtcNow;
var serializer = CreateContentLocationEventDataSerializer();
ShortHash.TryParse("VSO0:84ABF22908660978EE63C9", out var hash).Should().BeTrue();
var message = new TouchContentLocationEventData(42.AsMachineId(), new ShortHash[] {hash}, touchTime);
// Round trip validation is performed by the serializer
var serializedMessages = Serialize(serializer, new [] { message });
serializedMessages.Count.Should().Be(1); // All the cases we have should fit into one message.
}
[Fact]
public void SerializationRoundtripWithMetadata()
{
var serializer = CreateContentLocationEventDataSerializer();
DateTime eventTime = DateTime.Now;
var metadataEntry = new MetadataEntry(
new ContentHashListWithDeterminism(
new ContentHashList(
contentHashes: new ContentHash[] { ContentHash.Random() },
payload: ContentHash.Random().ToByteArray()),
determinism: CacheDeterminism.SinglePhaseNonDeterministic),
lastAccessTimeUtc: eventTime);
var message = new UpdateMetadataEntryEventData(42.AsMachineId(), StrongFingerprint.Random(), metadataEntry);
// Round trip validation is performed by the serializer
var serializedMessages = Serialize(serializer, new [] { message });
serializedMessages.Count.Should().Be(1); // All the cases we have should fit into one message.
var deserialized = serializer.DeserializeEvents(serializedMessages[0], eventTime);
deserialized.Count.Should().Be(1);
deserialized[0].Should().Be(message);
}
[Fact]
public async Task SerializationRoundtripWithMetadataLarge()
{
var context = OperationContext();
var path = TestRootDirectoryPath / "Tmp.txt";
var legacyPath = TestRootDirectoryPath / "Tmp.txt";
var serializer = CreateContentLocationEventDataSerializer();
var legacySerializer = CreateContentLocationEventDataSerializer(useSpanBasedSerialization: false);
var message = GenerateMetadataEvent(42.AsMachineId(), 50_000);
var length1 = await serializer.SaveToFileAsync(context, path, new[] {message});
var length2 = await legacySerializer.SaveToFileAsync(context, legacyPath, new[] {message});
length2.Should().Be(length1);
var data = FileSystem.ReadAllBytes(path);
var data2 = FileSystem.ReadAllBytes(legacyPath);
data.AsSpan().SequenceEqual(data2).Should().BeTrue();
var deserialized = serializer.LoadFromFile(context, path);
deserialized.Count.Should().Be(1);
deserialized[0].Should().Be(message);
}
private static List<EventData> Serialize(ContentLocationEventDataSerializer serializer, IReadOnlyList<ContentLocationEventData> messages)
{
return serializer.Serialize(OperationContext(), messages).ToList();
}
private static ContentLocationEventData GenerateRandomEventData(int index, int numberOfHashes, DateTime touchTime)
{
var random = new Random(index);
var hashesAndSizes = Enumerable.Range(1, numberOfHashes).Select(n => (hash: new ShortHash(ContentHash.Random()), size: (long)random.Next(10_000_000))).ToList();
return (index % 3) switch
return (index % 4) switch
{
0 => (ContentLocationEventData)new AddContentLocationEventData(new MachineId(index), hashesAndSizes.SelectArray(n => n.hash), hashesAndSizes.SelectArray(n => n.size)),
1 => new TouchContentLocationEventData(new MachineId(index), hashesAndSizes.SelectArray(n => n.hash), touchTime),
2 => GenerateMetadataEvent(new MachineId(index), numberOfHashes),
_ => new RemoveContentLocationEventData(new MachineId(index), hashesAndSizes.SelectArray(n => n.hash)),
};
}
private static ContentLocationEventDataSerializer CreateContentLocationEventDataSerializer() => new ContentLocationEventDataSerializer(ValidationMode.Fail);
private static ContentLocationEventData GenerateMetadataEvent(MachineId sender, int largeEventContentCount)
{
var contentHashes = Enumerable.Range(0, largeEventContentCount).Select(_ => ContentHash.Random()).ToArray();
return new UpdateMetadataEntryEventData(
sender,
StrongFingerprint.Random(),
new MetadataEntry(
new ContentHashListWithDeterminism(
new ContentHashList(contentHashes),
CacheDeterminism.None),
DateTime.UtcNow));
}
private ContentLocationEventDataSerializer CreateContentLocationEventDataSerializer(
bool useSpanBasedSerialization = true,
ValidationMode validationMode = ValidationMode.Fail) => new ContentLocationEventDataSerializer(
FileSystem,
useSpanBasedSerialization ? SerializationMode.SpanBased : SerializationMode.Legacy,
validationMode);
private static OperationContext OperationContext() => new OperationContext(new Context(TestGlobal.Logger));

Просмотреть файл

@ -306,23 +306,36 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// <summary>
/// Serialize to a span.
/// </summary>
public void Serialize(Span<byte> buffer, int offset = 0, SerializeHashBytesMethod serializeMethod = SerializeHashBytesMethod.Trimmed)
public int Serialize(Span<byte> buffer, int offset = 0, SerializeHashBytesMethod serializeMethod = SerializeHashBytesMethod.Trimmed)
{
var length = serializeMethod == SerializeHashBytesMethod.Trimmed ? ByteLength : MaxHashByteLength;
Serialize(buffer, offset, length);
return Serialize(buffer, offset, length);
}
/// <summary>
/// Serialize to a span.
/// </summary>
public void Serialize(Span<byte> buffer, int offset, int length)
public int Serialize(Span<byte> buffer, int offset, int length)
{
unchecked
{
buffer[offset++] = (byte)_hashType;
}
_bytes.Serialize(buffer.Slice(offset), length);
return _bytes.Serialize(buffer.Slice(offset), length) + 1;
}
/// <summary>
/// Serialize whole value to a target span.
/// </summary>
public int Serialize(Span<byte> targetSpan)
{
unchecked
{
targetSpan[0] = (byte)_hashType;
}
return _bytes.Serialize(targetSpan.Slice(1)) + 1;
}
/// <summary>

Просмотреть файл

@ -326,7 +326,7 @@ namespace BuildXL.Cache.ContentStore.Hashing
/// <summary>
/// Serialize value to a buffer.
/// </summary>
public void Serialize(Span<byte> buffer, int length = MaxLength)
public int Serialize(Span<byte> buffer, int length = MaxLength)
{
var len = Math.Min(length, Math.Min(buffer.Length, MaxLength));
@ -334,6 +334,8 @@ namespace BuildXL.Cache.ContentStore.Hashing
{
AsSpan(s, len).CopyTo(buffer);
}
return len;
}
/// <summary>

Просмотреть файл

@ -1,10 +1,13 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Diagnostics.ContractsLight;
using System.IO;
using System.Linq;
using BuildXL.Cache.ContentStore.Interfaces.FileSystem;
using BuildXL.Cache.ContentStore.Interfaces.Tracing;
using BuildXL.Cache.ContentStore.Tracing;
using FileInfo = BuildXL.Cache.ContentStore.Interfaces.FileSystem.FileInfo;
namespace BuildXL.Cache.ContentStore.FileSystem
@ -14,6 +17,7 @@ namespace BuildXL.Cache.ContentStore.FileSystem
/// </summary>
public static class FileSystemExtensions
{
private static readonly Tracer Tracer = new Tracer(nameof(FileSystemExtensions));
/// <summary>
/// Remove all contents, subdirectories and files, from a directory.
/// </summary>
@ -46,5 +50,22 @@ namespace BuildXL.Cache.ContentStore.FileSystem
fileSystem.DeleteFile(fileInfo.FullPath);
}
}
/// <summary>
/// Tries deleting a given <paramref name="path"/> and traces an exception in case of an error.
/// </summary>
public static bool TryDeleteFile(this IAbsFileSystem fileSystem, Context context, AbsolutePath path)
{
try
{
fileSystem.DeleteFile(path);
return true;
}
catch (Exception e)
{
Tracer.Warning(context, e, $"Failed to delete file '{path}'");
return false;
}
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using BuildXL.Cache.ContentStore.Hashing;
using BuildXL.Utilities.Serialization;
namespace BuildXL.Cache.ContentStore.Utils
{
public static class HashSerializationExtensions
{
/// <nodoc />
public static void Write(this ref SpanWriter writer, in ShortHash value)
{
writer.Write<ShortHash>(value);
}
/// <nodoc />
public static ShortHash ReadShortHash(this SpanReader reader)
{
var span = reader.ReadSpan(ShortHash.SerializedLength, allowIncomplete: true);
return ShortHash.FromSpan(span);
}
/// <nodoc />
public static void Write(this ref SpanWriter writer, in ContentHash value)
{
writer.EnsureLength(ContentHash.MaxHashByteLength);
var writtenBytes = value.Serialize(writer.Remaining);
writer.Advance(writtenBytes);
}
}
}

Просмотреть файл

@ -645,6 +645,12 @@ namespace BuildXL.Cache.Host.Configuration
[DataMember]
public TimeSpanSetting? EventHubFlushShutdownTimeout { get; set; }
[DataMember]
public bool? EventHubUseSpanBasedSerialization { get; set; }
[DataMember]
public bool? EventHubSelfCheckSerialization { get; set; }
[DataMember]
[Validation.Range(1, int.MaxValue)]
public int? EventProcessingMaxQueueSize { get; set; }

Просмотреть файл

@ -603,6 +603,8 @@ namespace BuildXL.Cache.Host.Service.Internal
ApplyIfNotNull(_distributedSettings.EventBatchSize, value => eventStoreConfiguration.EventBatchSize = value);
ApplyIfNotNull(_distributedSettings.EventHubFlushShutdownTimeout, value => eventStoreConfiguration.FlushShutdownTimeout = value);
ApplyIfNotNull(_distributedSettings.EventProcessingMaxQueueSize, value => eventStoreConfiguration.EventProcessingMaxQueueSize = value);
ApplyIfNotNull(_distributedSettings.EventHubUseSpanBasedSerialization, value => eventStoreConfiguration.UseSpanBasedSerialization = value);
ApplyIfNotNull(_distributedSettings.EventHubSelfCheckSerialization, value => eventStoreConfiguration.SelfCheckSerialization = value);
var azureBlobStorageCheckpointRegistryConfiguration = new AzureBlobStorageCheckpointRegistryConfiguration()
{

Просмотреть файл

@ -238,6 +238,15 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
WriteNullableArray(_payload, writer);
}
/// <summary>
/// Serialize whole value to a binary writer.
/// </summary>
public void Serialize(ref SpanWriter writer)
{
writer.Write(_contentHashes, (ref SpanWriter w, ContentHash hash) => w.Write(hash));
WriteNullableArray(_payload, ref writer);
}
/// <summary>
/// Initializes a new instance of the <see cref="ContentHashList" /> class from its binary representation.
/// </summary>
@ -280,6 +289,20 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
}
}
/// <nodoc />
public static void WriteNullableArray(byte[] array, ref SpanWriter writer)
{
if (array == null)
{
writer.WriteCompact(-1);
}
else
{
writer.WriteCompact(array.Length);
writer.Write(array);
}
}
/// <nodoc />
public static byte[] ReadNullableArray(BuildXLReader reader)
{

Просмотреть файл

@ -48,6 +48,19 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
writer.Write(determinism);
}
/// <summary>
/// Serializes an instance into a binary stream.
/// </summary>
public void Serialize(ref SpanWriter writer)
{
writer.Write(ContentHashList != null);
ContentHashList?.Serialize(ref writer);
var determinism = Determinism.Serialize();
writer.Write(determinism.Length);
writer.Write(determinism);
}
/// <summary>
/// Initializes a new instance of the <see cref="ContentHashListWithDeterminism"/> struct from its binary
/// representation.

Просмотреть файл

@ -189,6 +189,19 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
_bytes.Serialize(writer, _length);
}
/// <summary>
/// Serialize whole value to a binary writer.
/// </summary>
public void Serialize(ref SpanWriter writer)
{
Contract.Requires(Length > 0);
writer.Write(_length);
writer.EnsureLength(_length);
int bytesWritten = _bytes.Serialize(writer.Remaining, _length);
writer.Advance(bytesWritten);
}
/// <summary>
/// Initializes a new instance of the <see cref="Fingerprint"/> struct.
/// </summary>

Просмотреть файл

@ -53,6 +53,18 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
ContentHashList.WriteNullableArray(Output, writer);
}
/// <summary>
/// Serialize whole value to a binary writer.
/// </summary>
public void Serialize(ref SpanWriter writer)
{
writer.EnsureLength(ContentHash.SerializedLength);
int length = ContentHash.Serialize(writer.Remaining);
writer.Advance(length);
ContentHashList.WriteNullableArray(Output, ref writer);
}
/// <summary>
/// Initializes a new instance of the <see cref="Selector" /> struct.
/// </summary>

Просмотреть файл

@ -50,6 +50,19 @@ namespace BuildXL.Cache.MemoizationStore.Interfaces.Sessions
Selector.Serialize(writer);
}
/// <summary>
/// Serialize whole value to a binary writer.
/// </summary>
/// <remarks>
/// The included <see cref="Fingerprint"/> needs to always come first in the serialization order. This is
/// needed to be able to do prefix searches by weak fingerprint in key value stores.
/// </remarks>
public void Serialize(ref SpanWriter writer)
{
WeakFingerprint.Serialize(ref writer);
Selector.Serialize(ref writer);
}
/// <summary>
/// Initializes a new instance of the <see cref="StrongFingerprint" /> struct from its binary
/// representation.

Просмотреть файл

@ -4,6 +4,7 @@
using System;
using System.IO;
using System.Linq;
using System.Text;
using BuildXL.Utilities.Core;
using BuildXL.Utilities.Serialization;
using FluentAssertions;
@ -15,6 +16,81 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
{
private const int Length = 42;
private static void StressTestWithRandomStrings(Action<string, byte[]> testFunc, int byteSize = 1024, int iterationCount = 500)
{
var random = new Random();
var data = new byte[byteSize];
var buffer = new byte[byteSize * 10];
for (int i = 0; i < iterationCount; i++)
{
random.NextBytes(data);
var inputString = Encoding.UTF8.GetString(data);
testFunc(inputString, buffer);
}
}
[Theory]
[InlineData(1)]
[InlineData(10)]
[InlineData(1024)]
[InlineData(102400)]
public void StringSerializationTests(int byteSize)
{
StressTestWithRandomStrings(
(inputString, buffer) =>
{
var writer = new SpanWriter(buffer);
writer.Write(inputString);
var writtenSpan = writer.WrittenBytes;
var reader = writtenSpan.AsReader();
var deserializedString = reader.ReadString();
deserializedString.Should().Be(inputString);
},
byteSize,
iterationCount: byteSize > 100_000 ? 50 : 500); // Using smaller iteration count for large inputs
}
[Fact]
public void WriteViaBxlWriterReadViaSpanTest()
{
StressTestWithRandomStrings(
(inputString, buffer) =>
{
using var memoryStream = new MemoryStream(buffer);
// Writing with BuildXLWriter and reading via spans
using var writer = new BuildXLWriter(debug: false, memoryStream, leaveOpen: true, logStats: false);
writer.Write(inputString);
memoryStream.Position = 0;
var reader = memoryStream.ToArray().AsSpan().AsReader();
var deserializedString = reader.ReadString();
deserializedString.Should().Be(inputString);
});
}
[Fact]
public void WriteViaSpanWriterReadViaBxlReaderTest()
{
StressTestWithRandomStrings(
(inputString, buffer) =>
{
var writer = buffer.AsSpan().AsWriter();
writer.Write(inputString);
using var memoryStream = new MemoryStream(buffer);
using var reader = new BuildXLReader(debug: false, stream: memoryStream, leaveOpen: true);
var deserializedString = reader.ReadString();
deserializedString.Should().Be(inputString);
});
}
[Fact]
public void AdvanceMovesForward()
{
@ -107,7 +183,7 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
public void UInt32Compact(uint input)
{
Test(Length,
(ref SpanWriter writer) => writer.WriteUInt32Compact(input),
(ref SpanWriter writer) => writer.WriteCompact(input),
writer => writer.WriteCompact(input),
(ref SpanReader reader) => reader.ReadUInt32Compact().Should().Be(input),
reader => reader.ReadUInt32Compact().Should().Be(input)
@ -122,7 +198,7 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
public void Int32Compact(int input)
{
Test(Length,
(ref SpanWriter writer) => writer.WriteInt32Compact(input),
(ref SpanWriter writer) => writer.WriteCompact(input),
writer => writer.WriteCompact(input),
(ref SpanReader reader) => reader.ReadInt32Compact().Should().Be(input),
reader => reader.ReadInt32Compact().Should().Be(input)
@ -137,7 +213,7 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
public void Int64Compact(long input)
{
Test(Length,
(ref SpanWriter writer) => writer.WriteInt64Compact(input),
(ref SpanWriter writer) => writer.WriteCompact(input),
writer => writer.WriteCompact(input),
(ref SpanReader reader) => reader.ReadInt64Compact().Should().Be(input),
reader => reader.ReadInt64Compact().Should().Be(input)

Просмотреть файл

@ -1,7 +1,7 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System.Buffers;
using System;
using System.Linq;
using BuildXL.Utilities.Serialization;
using FluentAssertions;
@ -21,13 +21,13 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
public void ArrayResizes(int size)
{
var input = Enumerable.Range(0, size).Select(i => unchecked((byte)i)).ToArray();
var sw = new SpanWriter(new ArrayBufferWriter<byte>(initialCapacity: 1), defaultSizeHint: 1);
var sw = new SpanWriter(new BxlArrayBufferWriter<byte>(initialCapacity: 1), defaultSizeHint: 1);
sw.WriteSpan(input);
sw.WrittenBytes.Length.Should().Be(size);
sw.WrittenBytes.ToArray().Should().BeEquivalentTo(input);
sw = new SpanWriter(new ArrayBufferWriter<byte>(initialCapacity: 1));
sw = new SpanWriter(new BxlArrayBufferWriter<byte>(initialCapacity: 1));
foreach (var b in input)
{
sw.Write(b);
@ -36,5 +36,27 @@ namespace Test.BuildXL.Utilities.SpanBasedSerialization
sw.WrittenBytes.Length.Should().Be(size);
sw.WrittenBytes.ToArray().Should().BeEquivalentTo(input);
}
[Fact]
public void MultipleWriters()
{
// We want to make sure that multiple SpanWriter intances can be used
// with a single buffer.
var input = Enumerable.Range(0, 9).Select(i => unchecked((byte)i)).ToArray();
var firstBuffer = new BxlArrayBufferWriter<byte>(initialCapacity: 10);
var writer1 = new SpanWriter(firstBuffer, defaultSizeHint: 10);
writer1.WriteSpan(input);
var writer2 = new SpanWriter(firstBuffer, defaultSizeHint: 1);
writer2.WriteSpan(input);
var secondBuffer = new BxlArrayBufferWriter<byte>(initialCapacity: 100);
var secondWriter = new SpanWriter(secondBuffer, defaultSizeHint: 100);
secondWriter.WriteSpan(input);
secondWriter.WriteSpan(input);
firstBuffer.WrittenSpan.SequenceEqual(secondBuffer.WrittenSpan).Should().BeTrue();
}
}
}

Просмотреть файл

@ -0,0 +1,234 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Buffers;
using System.Diagnostics;
using System.Diagnostics.ContractsLight;
using System.Runtime.CompilerServices;
// The code is adopted from here: https://github.com/dotnet/runtime/blob/57bfe474518ab5b7cfe6bf7424a79ce3af9d6657/src/libraries/Common/src/System/Buffers/ArrayBufferWriter.cs
// with some minor changes
namespace BuildXL.Utilities.Serialization
{
/// <summary>
/// Represents a heap-based, array-backed output sink into which <typeparam name="T"/> data can be written.
/// </summary>
public sealed class BxlArrayBufferWriter<T> : IBufferWriter<T>
{
// Copy of Array.MaxLength.
// Used by projects targeting .NET Framework.
private const int ArrayMaxLength = 0x7FFFFFC7;
private const int DefaultInitialBufferSize = 256;
private T[] m_buffer;
private int m_index;
/// <summary>
/// Creates an instance of an <see cref="ArrayBufferWriter{T}"/>, in which data can be written to,
/// with the default initial capacity.
/// </summary>
public BxlArrayBufferWriter()
{
m_buffer = Array.Empty<T>();
m_index = 0;
}
/// <summary>
/// Creates an instance of an <see cref="ArrayBufferWriter{T}"/>, in which data can be written to,
/// with an initial capacity specified.
/// </summary>
/// <param name="initialCapacity">The minimum capacity with which to initialize the underlying buffer.</param>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="initialCapacity"/> is not positive (i.e. less than or equal to 0).
/// </exception>
public BxlArrayBufferWriter(int initialCapacity)
{
Contract.Requires(initialCapacity >= 0);
m_buffer = new T[initialCapacity];
m_index = 0;
}
/// <summary>
/// Returns the data written to the underlying buffer so far, as a <see cref="ReadOnlyMemory{T}"/>.
/// </summary>
public ReadOnlyMemory<T> WrittenMemory => m_buffer.AsMemory(0, m_index);
/// <summary>
/// Returns the data written to the underlying buffer so far, as a <see cref="ReadOnlySpan{T}"/>.
/// </summary>
public ReadOnlySpan<T> WrittenSpan => m_buffer.AsSpan(0, m_index);
/// <summary>
/// Returns the amount of data written to the underlying buffer so far.
/// </summary>
public int WrittenCount => m_index;
/// <summary>
/// Returns the total amount of space within the underlying buffer.
/// </summary>
public int Capacity => m_buffer.Length;
/// <summary>
/// Returns the amount of space available that can still be written into without forcing the underlying buffer to grow.
/// </summary>
public int FreeCapacity => m_buffer.Length - m_index;
/// <summary>
/// Clears the data written to the underlying buffer.
/// </summary>
/// <remarks>
/// You must clear the <see cref="ArrayBufferWriter{T}"/> before trying to re-use it.
/// </remarks>
public void Clear()
{
Debug.Assert(m_buffer.Length >= m_index);
m_buffer.AsSpan(0, m_index).Clear();
m_index = 0;
}
/// <summary>
/// Sets the position for the buffer.
/// </summary>
public void SetPosition(int position)
{
Contract.Requires(position < m_buffer.Length);
m_index = position;
}
/// <summary>
/// Notifies <see cref="IBufferWriter{T}"/> that <paramref name="count"/> amount of data was written to the output <see cref="Span{T}"/>/<see cref="Memory{T}"/>
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="count"/> is negative.
/// </exception>
/// <exception cref="InvalidOperationException">
/// Thrown when attempting to advance past the end of the underlying buffer.
/// </exception>
/// <remarks>
/// You must request a new buffer after calling Advance to continue writing more data and cannot write to a previously acquired buffer.
/// </remarks>
[MethodImpl(MethodImplOptions.AggressiveInlining
#if NET60_OR_GREATER
| MethodImplOptions.AggressiveOptimization
#endif
)] // This method is called a lot.
public void Advance(int count)
{
Contract.Requires(count >= 0);
if (m_index > m_buffer.Length - count)
ThrowInvalidOperationException_AdvancedTooFar(m_buffer.Length);
m_index += count;
}
/// <summary>
/// Returns a <see cref="Memory{T}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// If no <paramref name="sizeHint"/> is provided (or it's equal to <code>0</code>), some non-empty buffer is returned.
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="sizeHint"/> is negative.
/// </exception>
/// <remarks>
/// This will never return an empty <see cref="Memory{T}"/>.
/// </remarks>
/// <remarks>
/// There is no guarantee that successive calls will return the same buffer or the same-sized buffer.
/// </remarks>
/// <remarks>
/// You must request a new buffer after calling Advance to continue writing more data and cannot write to a previously acquired buffer.
/// </remarks>
public Memory<T> GetMemory(int sizeHint = 0)
{
CheckAndResizeBuffer(sizeHint);
Debug.Assert(m_buffer.Length > m_index);
return m_buffer.AsMemory(m_index);
}
/// <inheritdoc cref="GetSpan(int)"/>
public Span<T> GetSpan(int sizeHint, bool fromStart)
{
CheckAndResizeBuffer(sizeHint);
Debug.Assert(m_buffer.Length > m_index);
int index = fromStart ? 0 : m_index;
return m_buffer.AsSpan(index);
}
/// <summary>
/// Returns a <see cref="Span{T}"/> to write to that is at least the requested length (specified by <paramref name="sizeHint"/>).
/// If no <paramref name="sizeHint"/> is provided (or it's equal to <code>0</code>), some non-empty buffer is returned.
/// </summary>
/// <exception cref="ArgumentException">
/// Thrown when <paramref name="sizeHint"/> is negative.
/// </exception>
/// <remarks>
/// This will never return an empty <see cref="Span{T}"/>.
/// </remarks>
/// <remarks>
/// There is no guarantee that successive calls will return the same buffer or the same-sized buffer.
/// </remarks>
/// <remarks>
/// You must request a new buffer after calling Advance to continue writing more data and cannot write to a previously acquired buffer.
/// </remarks>
public Span<T> GetSpan(int sizeHint = 0) => GetSpan(sizeHint, fromStart: false);
private void CheckAndResizeBuffer(int sizeHint)
{
Contract.Requires(sizeHint >= 0);
if (sizeHint == 0)
{
sizeHint = 1;
}
if (sizeHint > FreeCapacity)
{
int currentLength = m_buffer.Length;
// Attempt to grow by the larger of the sizeHint and double the current size.
int growBy = Math.Max(sizeHint, currentLength);
if (currentLength == 0)
{
growBy = Math.Max(growBy, DefaultInitialBufferSize);
}
int newSize = currentLength + growBy;
if ((uint)newSize > int.MaxValue)
{
// Attempt to grow to ArrayMaxLength.
uint needed = (uint)(currentLength - FreeCapacity + sizeHint);
Debug.Assert(needed > currentLength);
if (needed > ArrayMaxLength)
{
ThrowOutOfMemoryException(needed);
}
newSize = ArrayMaxLength;
}
Array.Resize(ref m_buffer, newSize);
}
Debug.Assert(FreeCapacity > 0 && FreeCapacity >= sizeHint);
}
private static void ThrowInvalidOperationException_AdvancedTooFar(int capacity)
{
throw new InvalidOperationException($"Cannot advance past the end of the buffer, which has a size of {capacity}.");
}
private static void ThrowOutOfMemoryException(uint capacity)
{
throw new OutOfMemoryException($"Cannot allocate a buffer of size {capacity}.");
}
}
}

Просмотреть файл

@ -6,6 +6,8 @@ using System.Buffers.Binary;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
using System.Text;
using BuildXL.Utilities.Core;
#nullable enable
@ -55,15 +57,9 @@ namespace BuildXL.Utilities.Serialization
}
/// <nodoc />
public static void WriteInt32Compact(this ref SpanWriter writer, int value)
public static void WriteCompact(this ref SpanWriter writer, uint value)
{
writer.WriteUInt64Compact(unchecked((uint)value));
}
/// <nodoc />
public static void WriteUInt32Compact(this ref SpanWriter writer, uint value)
{
writer.WriteUInt64Compact(unchecked((ulong)value));
writer.Write7BitEncodedInt(unchecked((int)value));
}
/// <nodoc />
@ -82,20 +78,42 @@ namespace BuildXL.Utilities.Serialization
return reader.Read7BitEncodedLong();
}
/// <nodoc />
public static void WriteInt64Compact(this ref SpanWriter writer, long value)
/// <summary>
/// Compactly writes an int
/// </summary>
public static void WriteCompact(this ref SpanWriter writer, int value)
{
writer.WriteUInt64Compact(unchecked((ulong)value));
}
/// <nodoc />
public static void WriteCompact(this ref SpanWriter writer, long value)
{
writer.WriteUInt64Compact(unchecked((ulong)value));
writer.Write7BitEncodedInt(value);
}
/// <nodoc />
public static void WriteUInt64Compact(this ref SpanWriter writer, ulong value)
public static void WriteCompact(this ref SpanWriter writer, long value)
{
writer.WriteCompact(unchecked((ulong)value));
}
/// <nodoc />
public static void Write7BitEncodedInt(this ref SpanWriter writer, int value)
{
uint uValue = unchecked((uint)value);
// Write out an int 7 bits at a time. The high bit of the byte,
// when on, tells reader to continue reading more bytes.
//
// Using the constants 0x7F and ~0x7F below offers smaller
// codegen than using the constant 0x80.
while (uValue > 0x7Fu)
{
writer.Write(unchecked((byte)(uValue | ~0x7Fu)));
uValue >>= 7;
}
writer.Write((byte)uValue);
}
/// <nodoc />
public static void WriteCompact(this ref SpanWriter writer, ulong value)
{
int writeCount = 1;
unchecked
@ -287,21 +305,187 @@ namespace BuildXL.Utilities.Serialization
}
/// <summary>
/// Writes an array
/// Writes an array.
/// </summary>
public static void Write<T>(this ref SpanWriter writer, T[] value, WriteItemToSpan<T> write)
{
WriteReadOnlyListCore(ref writer, value, write);
}
/// <summary>
/// Writes a readonly list.
/// </summary>
public static void Write<T>(this ref SpanWriter writer, IReadOnlyList<T> value, WriteItemToSpan<T> write)
{
WriteReadOnlyListCore(ref writer, value, write);
}
private static void WriteReadOnlyListCore<T, TReadOnlyList>(this ref SpanWriter writer, TReadOnlyList value, WriteItemToSpan<T> write)
where TReadOnlyList : IReadOnlyList<T>
{
writer.WriteInt32Compact(value.Count);
writer.WriteCompact(value.Count);
for (int i = 0; i < value.Count; i++)
{
write(ref writer, value[i]);
}
}
/// <summary>
/// Reads a string from <paramref name="reader"/>.
/// </summary>
public static string ReadString(this ref SpanReader reader, Encoding? encoding = null)
{
// Adopted from BinaryReader.ReadString.
const int MaxCharBytesSize = 128;
encoding ??= Encoding.UTF8;
var maxCharsSize = encoding.GetMaxCharCount(MaxCharBytesSize);
var decoder = encoding.GetDecoder();
// Length of the string in bytes, not chars
int stringLength = reader.Read7BitEncodedInt();
if (stringLength < 0)
{
throw new InvalidOperationException($"The string length is invalid {stringLength}");
}
if (stringLength == 0)
{
return string.Empty;
}
// pooled buffers
using var pooledCharBuffer = Pools.GetCharArray(maxCharsSize);
var charBuffer = pooledCharBuffer.Instance;
StringBuilder? sb = null;
int currPos = 0;
do
{
int readLength = ((stringLength - currPos) > MaxCharBytesSize) ? MaxCharBytesSize : (stringLength - currPos);
var span = reader.ReadSpan(readLength, allowIncomplete: true);
int n = span.Length;
if (n == 0)
{
throw new InvalidOleVariantTypeException("Unexpected end of binary stream.");
}
#if NET5_0_OR_GREATER
int charsRead = decoder.GetChars(span, charBuffer.AsSpan(), flush: false);
#else
var charBytes = span.ToArray();
int charsRead = decoder.GetChars(charBytes, 0, n, charBuffer, 0);
#endif
if (currPos == 0 && span.Length == stringLength)
{
return new string(charBuffer, 0, charsRead);
}
// Since we could be reading from an untrusted data source, limit the initial size of the
// StringBuilder instance we're about to get or create. It'll expand automatically as needed.
sb ??= StringBuilderCache.Acquire(Math.Min(stringLength, StringBuilderCache.MaxBuilderSize)); // Actual string length in chars may be smaller.
sb.Append(charBuffer, 0, charsRead);
currPos += n;
} while (currPos < stringLength);
// In this case we can return the buffer back, but not in the case when we read the whole string.
return StringBuilderCache.GetStringAndRelease(sb);
}
private const int MaxArrayPoolRentalSize = 64 * 1024; // try to keep rentals to a reasonable size
/// <summary>
/// Writes a given <paramref name="value"/> into <paramref name="writer"/>.
/// </summary>
public static void Write(this ref SpanWriter writer, string value, Encoding? encoding = null)
{
encoding ??= Encoding.UTF8;
// Adopted from BinaryWriter.Write(string value)
#if NET5_0_OR_GREATER
if (value.Length <= 127 / 3)
{
// Max expansion: each char -> 3 bytes, so 127 bytes max of data, +1 for length prefix
Span<byte> buffer = stackalloc byte[128];
int actualByteCount = encoding.GetBytes(value, buffer.Slice(1));
buffer[0] = (byte)actualByteCount; // bypass call to Write7BitEncodedInt
var slice = buffer.Slice(0, actualByteCount + 1 /* length prefix */);
writer.WriteSpan(slice);
return;
}
if (value.Length <= MaxArrayPoolRentalSize / 3)
{
using var wrapper = Pools.GetByteArray(value.Length * 3); // max expansion: each char -> 3 bytes
var rented = wrapper.Instance;
int actualByteCount = encoding.GetBytes(value, rented);
writer.Write7BitEncodedInt(actualByteCount);
writer.Write(rented.AsSpan(0, actualByteCount));
return;
}
// Slow path
writer.Write7BitEncodedInt(encoding.GetByteCount(value));
WriteCharsCommonWithoutLengthPrefix(ref writer, value, encoding);
#else
// Fairly naive version for the full framework.
var bytes = encoding.GetBytes(value);
writer.Write7BitEncodedInt(bytes.Length);
writer.Write(bytes);
#endif
}
#if NET5_0_OR_GREATER
private static void WriteCharsCommonWithoutLengthPrefix(this ref SpanWriter writer, ReadOnlySpan<char> chars, Encoding encoding)
{
// If our input is truly enormous, the call to GetMaxByteCount might overflow,
// which we want to avoid. Theoretically, any Encoding could expand from chars -> bytes
// at an enormous ratio and cause us problems anyway given small inputs, but this is so
// unrealistic that we needn't worry about it.
byte[] rented;
if (chars.Length <= MaxArrayPoolRentalSize)
{
// GetByteCount may walk the buffer contents, resulting in 2 passes over the data.
// We prefer GetMaxByteCount because it's a constant-time operation.
int maxByteCount = encoding.GetMaxByteCount(chars.Length);
if (maxByteCount <= MaxArrayPoolRentalSize)
{
using var rentedHandlerInner = Pools.GetByteArray(maxByteCount);
rented = rentedHandlerInner.Instance;
int actualByteCount = encoding.GetBytes(chars, rented);
WriteToOutStream(ref writer, rented, 0, actualByteCount);
return;
}
}
// We're dealing with an enormous amount of data, so acquire an Encoder.
// It should be rare that callers pass sufficiently large inputs to hit
// this code path, and the cost of the operation is dominated by the transcoding
// step anyway, so it's ok for us to take the allocation here.
using var rentedHandler = Pools.GetByteArray(MaxArrayPoolRentalSize);
rented = rentedHandler.Instance;
Encoder encoder = encoding.GetEncoder();
bool completed;
do
{
encoder.Convert(chars, rented, flush: true, charsUsed: out int charsConsumed, bytesUsed: out int bytesWritten, completed: out completed);
if (bytesWritten != 0)
{
WriteToOutStream(ref writer, rented, 0, bytesWritten);
}
chars = chars.Slice(charsConsumed);
} while (!completed);
}
private static void WriteToOutStream(ref SpanWriter writer, byte[] buffer, int offset, int count)
{
writer.Write(buffer.AsSpan(offset, count));
}
#endif
}
}

Просмотреть файл

@ -2,10 +2,7 @@
// Licensed under the MIT License.
using System;
using System.Buffers;
using System.Diagnostics.ContractsLight;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
#nullable enable
@ -18,14 +15,14 @@ namespace BuildXL.Utilities.Serialization
/// The main purpose of this type is serializing instances directly to spans.
/// Because its a struct, the serialization methods should get the instance by ref in order for the caller methods to "observe" the position
/// changes that happen during serialization.
/// The instance might be created with <see cref="ArrayBufferWriter{T}"/> to allow writing into an expandable buffers.
/// The instance might be created with <see cref="BxlArrayBufferWriter{T}"/> to allow writing into an expandable buffers.
/// </remarks>
public ref struct SpanWriter
{
/// <summary>
/// An optional expandable buffer writer the current writer writes to.
/// </summary>
private readonly ArrayBufferWriter<byte>? m_bufferWriter;
private readonly BxlArrayBufferWriter<byte>? m_bufferWriter;
// Re-computing the remaining length instead of computing it on the fly, because
// we access 'RemainingLength' property on a hot path and its cheaper to update
@ -47,6 +44,15 @@ namespace BuildXL.Utilities.Serialization
readonly get => m_position;
set
{
if (m_bufferWriter is not null)
{
var count = value - m_position;
if (count > 0)
{
m_bufferWriter.Advance(count);
}
}
m_position = value;
m_remainingLength = Span.Length - m_position;
}
@ -71,7 +77,7 @@ namespace BuildXL.Utilities.Serialization
public Span<byte> WrittenBytes => Span.Slice(0, Position);
/// <nodoc />
public SpanWriter(ArrayBufferWriter<byte> bufferWriter, int defaultSizeHint = 4 * 1024)
public SpanWriter(BxlArrayBufferWriter<byte> bufferWriter, int defaultSizeHint = 4 * 1024)
{
m_bufferWriter = bufferWriter;
@ -80,6 +86,16 @@ namespace BuildXL.Utilities.Serialization
m_remainingLength = Span.Length;
}
/// <nodoc />
private SpanWriter(BxlArrayBufferWriter<byte> bufferWriter, Span<byte> span, int position)
{
m_bufferWriter = bufferWriter;
Span = span;
m_position = position;
m_remainingLength = Span.Length - position;
}
/// <nodoc />
public SpanWriter(Span<byte> span)
{
@ -124,14 +140,18 @@ namespace BuildXL.Utilities.Serialization
}
/// <nodoc />
public void WriteSpan(ReadOnlySpan<byte> source)
[MethodImpl(MethodImplOptions.AggressiveInlining)] // Forcing the inlineing of this method, because its used very often.
public void WriteSpan(scoped ReadOnlySpan<byte> source)
{
EnsureLength(source.Length);
source.CopyTo(Span.Slice(Position, source.Length));
Position += source.Length;
}
internal void EnsureLength(int minLength)
/// <summary>
/// Makes sure that the write has enough space for <paramref name="minLength"/>.
/// </summary>
public void EnsureLength(int minLength)
{
if (RemainingLength < minLength)
{
@ -149,9 +169,17 @@ namespace BuildXL.Utilities.Serialization
// To do that we have to specify a bigger size when calling 'ArrayBufferWriter{T}.GetSpan()'.
// So in case when we don't have enough space, we just re-creating a span writer
// with the size hint enough to keep the required data.
var other = new SpanWriter(m_bufferWriter, (Position + minLength) * 2);
other.WriteSpan(WrittenBytes);
// Need to set the position to 0, because otherwise GetSpan call used in the SpanWriter's constructor
// will get the span not from the beginning of the array but from the current index.
var start = m_bufferWriter.WrittenCount - Position;
var newSpan = m_bufferWriter.GetSpan(sizeHint: (Position + minLength) * 2, fromStart: true).Slice(start);
var other = new SpanWriter(m_bufferWriter, newSpan, Position);
//var start = m_bufferWriter.WrittenCount - Position;
//m_bufferWriter.SetPosition(0);
//var other = new SpanWriter(m_bufferWriter, (Position + minLength) * 2);
//m_bufferWriter.SetPosition(start);
//other.WriteSpan(WrittenBytes);
this = other;
}
else

Просмотреть файл

@ -0,0 +1,68 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using System;
using System.Text;
#nullable enable
namespace BuildXL.Utilities.Serialization
{
// The code is adopted from the following place: https://github.com/dotnet/runtime/blob/118a1628c959e5c5895f9971104739c38cdc253b/src/libraries/Common/src/System/Text/StringBuilderCache.cs
/// <summary>Provide a cached reusable instance of StringBuilder per thread.</summary>
internal static class StringBuilderCache
{
// The value 360 was chosen in discussion with performance experts as a compromise between using
// as little memory per thread as possible and still covering a large part of short-lived
// StringBuilder creations on the startup path of VS designers.
internal const int MaxBuilderSize = 360;
private const int DefaultCapacity = 16; // == StringBuilder.DefaultCapacity
// WARNING: We allow diagnostic tools to directly inspect this member (t_cachedInstance).
// See https://github.com/dotnet/corert/blob/master/Documentation/design-docs/diagnostics/diagnostics-tools-contract.md for more details.
// Please do not change the type, the name, or the semantic usage of this member without understanding the implication for tools.
// Get in touch with the diagnostics team if you have questions.
[ThreadStatic]
private static StringBuilder? t_cachedInstance;
/// <summary>Get a StringBuilder for the specified capacity.</summary>
/// <remarks>If a StringBuilder of an appropriate size is cached, it will be returned and the cache emptied.</remarks>
public static StringBuilder Acquire(int capacity = DefaultCapacity)
{
if (capacity <= MaxBuilderSize)
{
StringBuilder? sb = t_cachedInstance;
if (sb != null)
{
// Avoid stringbuilder block fragmentation by getting a new StringBuilder
// when the requested size is larger than the current capacity
if (capacity <= sb.Capacity)
{
t_cachedInstance = null;
sb.Clear();
return sb;
}
}
}
return new StringBuilder(capacity);
}
/// <summary>Place the specified builder in the cache if it is not too big.</summary>
public static void Release(StringBuilder sb)
{
if (sb.Capacity <= MaxBuilderSize)
{
t_cachedInstance = sb;
}
}
/// <summary>ToString() the stringbuilder, Release it to the cache, and return the resulting string.</summary>
public static string GetStringAndRelease(StringBuilder sb)
{
string result = sb.ToString();
Release(sb);
return result;
}
}
}

Просмотреть файл

@ -2,7 +2,6 @@
// Licensed under the MIT License.
using System;
using System.Buffers;
using System.IO;
using BuildXL.Utilities.Core;
using BuildXL.Utilities.Serialization;
@ -10,18 +9,16 @@ using BuildXL.Utilities.Serialization;
namespace BuildXL.Utilities
{
/// <summary>
/// A pooled handle for <see cref="ArrayBufferWriter{T}"/> used during serialization.
/// A pooled handle for <see cref="BxlArrayBufferWriter{T}"/> used during serialization.
/// </summary>
public readonly struct PooledArrayBuffer : IDisposable
{
private readonly PooledObjectWrapper<ArrayBufferWriter<byte>> m_writer;
private readonly int m_writtenCount;
private readonly PooledObjectWrapper<BxlArrayBufferWriter<byte>> m_writer;
/// <nodoc />
internal PooledArrayBuffer(PooledObjectWrapper<ArrayBufferWriter<byte>> writer, int writtenCount)
internal PooledArrayBuffer(PooledObjectWrapper<BxlArrayBufferWriter<byte>> writer)
{
m_writer = writer;
m_writtenCount = writtenCount;
}
/// <inheritdoc />
@ -33,7 +30,7 @@ namespace BuildXL.Utilities
/// <summary>
/// Gets the span of bytes written by the span writer.
/// </summary>
public ReadOnlySpan<byte> WrittenSpan => m_writer.Instance.GetSpan().Slice(0, length: m_writtenCount);
public ReadOnlySpan<byte> WrittenSpan => m_writer.Instance.WrittenSpan;
}
/// <nodoc />
@ -87,7 +84,7 @@ namespace BuildXL.Utilities
private readonly ObjectPool<StreamBinaryWriter> m_writerPool = new(static () => new StreamBinaryWriter(), static w => w.ResetPosition());
private readonly ObjectPool<StreamBinaryReader> m_readerPool = new(static () => new StreamBinaryReader(), static r => { });
private readonly ObjectPool<ArrayBufferWriter<byte>> m_arrayBufferWriters = new(() => new ArrayBufferWriter<byte>(), bw => (bw).Clear());
private readonly ObjectPool<BxlArrayBufferWriter<byte>> m_arrayBufferWriters = new(() => new BxlArrayBufferWriter<byte>(), bw => (bw).Clear());
/// <nodoc />
public byte[] Serialize<TResult>(TResult instance, Action<TResult, BuildXLWriter> serializeFunc)
@ -109,7 +106,7 @@ namespace BuildXL.Utilities
public delegate void SerializeDelegate<in T>(T instance, ref SpanWriter writer);
/// <summary>
/// Serialize a given <paramref name="instance"/> with pooled <see cref="ArrayBufferWriter{T}"/>.
/// Serialize a given <paramref name="instance"/> with pooled <see cref="BxlArrayBufferWriter{T}"/>.
/// </summary>
public PooledArrayBuffer SerializePooled<T>(T instance, SerializeDelegate<T> serializeFunc)
{
@ -117,7 +114,7 @@ namespace BuildXL.Utilities
var writer = new SpanWriter(arrayBuffer.Instance);
serializeFunc(instance, ref writer);
return new PooledArrayBuffer(arrayBuffer, writer.Position);
return new PooledArrayBuffer(arrayBuffer);
}

Просмотреть файл

@ -50,7 +50,7 @@ namespace BuildXL.Utilities
/// <summary>
/// Helper struct that restores the position of <see cref="StreamBinaryWriter"/>.
/// </summary>
public struct PositionRestorer : IDisposable
public readonly struct PositionRestorer : IDisposable
{
private readonly StreamBinaryWriter _writer;