зеркало из https://github.com/mozilla/gecko-dev.git
520 строки
19 KiB
C++
520 строки
19 KiB
C++
/* -*- Mode: C++; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
|
|
/* vim: set ts=8 sts=2 et sw=2 tw=80: */
|
|
/* This Source Code Form is subject to the terms of the Mozilla Public
|
|
* License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
|
|
|
|
#ifndef mozilla_MultiWriterQueue_h_
|
|
#define mozilla_MultiWriterQueue_h_
|
|
|
|
#include <cstdint>
|
|
#include <utility>
|
|
|
|
#include "RollingNumber.h"
|
|
#include "mozilla/Atomics.h"
|
|
#include "mozilla/MemoryReporting.h"
|
|
#include "mozilla/Mutex.h"
|
|
#include "prthread.h"
|
|
|
|
namespace mozilla {
|
|
|
|
// Default reader locking strategy, using a mutex to ensure that concurrent
|
|
// PopAll calls won't overlap.
|
|
class MultiWriterQueueReaderLocking_Mutex {
|
|
public:
|
|
MultiWriterQueueReaderLocking_Mutex()
|
|
: mMutex("MultiWriterQueueReaderLocking_Mutex") {}
|
|
void Lock() { mMutex.Lock(); };
|
|
void Unlock() { mMutex.Unlock(); };
|
|
|
|
private:
|
|
Mutex mMutex;
|
|
};
|
|
|
|
// Reader non-locking strategy, trusting that PopAll will never be called
|
|
// concurrently (e.g., by only calling it from a specific thread).
|
|
class MultiWriterQueueReaderLocking_None {
|
|
public:
|
|
#ifndef DEBUG
|
|
void Lock(){};
|
|
void Unlock(){};
|
|
#else
|
|
// DEBUG-mode checks to catch concurrent misuses.
|
|
void Lock() { MOZ_ASSERT(mLocked.compareExchange(false, true)); };
|
|
void Unlock() { MOZ_ASSERT(mLocked.compareExchange(true, false)); };
|
|
|
|
private:
|
|
Atomic<bool> mLocked{false};
|
|
#endif
|
|
};
|
|
|
|
static constexpr uint32_t MultiWriterQueueDefaultBufferSize = 8192;
|
|
|
|
// Multi-writer, single-reader queue of elements of type `T`.
|
|
// Elements are bunched together in buffers of `BufferSize` elements.
|
|
//
|
|
// This queue is heavily optimized for pushing. In most cases pushes will only
|
|
// cost a couple of atomic reads and a few non-atomic reads. Worst cases:
|
|
// - Once per buffer, a push will allocate or reuse a buffer for later pushes;
|
|
// - During the above new-buffer push, other pushes will be blocked.
|
|
//
|
|
// By default, popping is protected by mutex; it may be disabled if popping is
|
|
// guaranteed never to be concurrent.
|
|
// In any case, popping will never negatively impact pushes.
|
|
// (However, *not* popping will add runtime costs, as unread buffers will not
|
|
// be freed, or made available to future pushes; Push functions provide
|
|
// feedback as to when popping would be most efficient.)
|
|
template <typename T, uint32_t BufferSize = MultiWriterQueueDefaultBufferSize,
|
|
typename ReaderLocking = MultiWriterQueueReaderLocking_Mutex>
|
|
class MultiWriterQueue {
|
|
static_assert(BufferSize > 0, "0-sized MultiWriterQueue buffer");
|
|
|
|
public:
|
|
// Constructor.
|
|
// Allocates the initial buffer that will receive the first `BufferSize`
|
|
// elements. Also allocates one reusable buffer, which will definitely be
|
|
// needed after the first `BufferSize` elements have been pushed.
|
|
// Ideally (if the reader can process each buffer quickly enough), there
|
|
// won't be a need for more buffer allocations.
|
|
MultiWriterQueue()
|
|
: mBuffersCoverAtLeastUpTo(BufferSize - 1),
|
|
mMostRecentBuffer(new Buffer{}),
|
|
mReusableBuffers(new Buffer{}),
|
|
mOldestBuffer(static_cast<Buffer*>(mMostRecentBuffer)),
|
|
mLiveBuffersStats(1),
|
|
mReusableBuffersStats(1),
|
|
mAllocatedBuffersStats(2) {}
|
|
|
|
~MultiWriterQueue() {
|
|
auto DestroyList = [](Buffer* aBuffer) {
|
|
while (aBuffer) {
|
|
Buffer* older = aBuffer->Older();
|
|
delete aBuffer;
|
|
aBuffer = older;
|
|
}
|
|
};
|
|
DestroyList(mMostRecentBuffer);
|
|
DestroyList(mReusableBuffers);
|
|
}
|
|
|
|
// We need the index to be order-resistant to overflow, i.e., numbers before
|
|
// an overflow should test smaller-than numbers after the overflow.
|
|
// This is because we keep pushing elements with increasing Index, and this
|
|
// Index is used to find the appropriate buffer based on a range; and this
|
|
// need to work smoothly when crossing the overflow boundary.
|
|
using Index = RollingNumber<uint32_t>;
|
|
|
|
// Pushes indicate whether they have just reached the end of a buffer.
|
|
using DidReachEndOfBuffer = bool;
|
|
|
|
// Push new element and call aF on it.
|
|
// Element may be in just-created state, or recycled after a PopAll call.
|
|
// Atomically thread-safe; in the worst case some pushes may be blocked
|
|
// while a new buffer is created/reused for them.
|
|
// Returns whether that push reached the end of a buffer; useful if caller
|
|
// wants to trigger processing regularly at the most efficient time.
|
|
template <typename F>
|
|
DidReachEndOfBuffer PushF(F&& aF) {
|
|
// Atomically claim ownership of the next available element.
|
|
const Index index{mNextElementToWrite++};
|
|
// And now go and set that element.
|
|
for (;;) {
|
|
Index lastIndex{mBuffersCoverAtLeastUpTo};
|
|
|
|
if (MOZ_UNLIKELY(index == lastIndex)) {
|
|
// We have claimed the last element in the current head -> Allocate a
|
|
// new head in advance of more pushes. Make it point at the current
|
|
// most-recent buffer.
|
|
// This whole process is effectively guarded:
|
|
// - Later pushes will wait until mBuffersCoverAtLeastUpTo changes to
|
|
// one that can accept their claimed index.
|
|
// - Readers will stop until the last element is marked as valid.
|
|
Buffer* ourBuffer = mMostRecentBuffer;
|
|
Buffer* newBuffer = NewBuffer(ourBuffer, index + 1);
|
|
// Because we have claimed this very specific index, we should be the
|
|
// only one touching the most-recent buffer pointer.
|
|
MOZ_ASSERT(mMostRecentBuffer == ourBuffer);
|
|
// Just pivot the most-recent buffer pointer to our new buffer.
|
|
mMostRecentBuffer = newBuffer;
|
|
// Because we have claimed this very specific index, we should be the
|
|
// only one touching the buffer coverage watermark.
|
|
MOZ_ASSERT(mBuffersCoverAtLeastUpTo == lastIndex.Value());
|
|
// Update it to include the just-added most-recent buffer.
|
|
mBuffersCoverAtLeastUpTo = index.Value() + BufferSize;
|
|
// We know for sure that `ourBuffer` is the correct one for this index.
|
|
ourBuffer->SetAndValidateElement(aF, index);
|
|
// And indicate that we have reached the end of a buffer.
|
|
return true;
|
|
}
|
|
|
|
if (MOZ_UNLIKELY(index > lastIndex)) {
|
|
// We have claimed an element in a yet-unavailable buffer, wait for our
|
|
// target buffer to be created (see above).
|
|
while (Index(mBuffersCoverAtLeastUpTo) < index) {
|
|
PR_Sleep(PR_INTERVAL_NO_WAIT); // Yield
|
|
}
|
|
// Then loop to examine the new situation.
|
|
continue;
|
|
}
|
|
|
|
// Here, we have claimed a number that is covered by current buffers.
|
|
// These buffers cannot be destroyed, because our buffer is not filled
|
|
// yet (we haven't written in it yet), therefore the reader thread will
|
|
// have to stop there (or before) and won't destroy our buffer or more
|
|
// recent ones.
|
|
MOZ_ASSERT(index < lastIndex);
|
|
Buffer* ourBuffer = mMostRecentBuffer;
|
|
|
|
// In rare situations, another thread may have had the time to create a
|
|
// new more-recent buffer, in which case we need to find our older buffer.
|
|
while (MOZ_UNLIKELY(index < ourBuffer->Origin())) {
|
|
// We assume that older buffers with still-invalid elements (e.g., the
|
|
// one we have just claimed) cannot be destroyed.
|
|
MOZ_ASSERT(ourBuffer->Older());
|
|
ourBuffer = ourBuffer->Older();
|
|
}
|
|
|
|
// Now we can set&validate the claimed element, and indicate that we have
|
|
// not reached the end of a buffer.
|
|
ourBuffer->SetAndValidateElement(aF, index);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
// Push new element and assign it a value.
|
|
// Atomically thread-safe; in the worst case some pushes may be blocked
|
|
// while a new buffer is created/reused for them.
|
|
// Returns whether that push reached the end of a buffer; useful if caller
|
|
// wants to trigger processing regularly at the most efficient time.
|
|
DidReachEndOfBuffer Push(const T& aT) {
|
|
return PushF([&aT](T& aElement, Index) { aElement = aT; });
|
|
}
|
|
|
|
// Push new element and move-assign it a value.
|
|
// Atomically thread-safe; in the worst case some pushes may be blocked
|
|
// while a new buffer is created/reused for them.
|
|
// Returns whether that push reached the end of a buffer; useful if caller
|
|
// wants to trigger processing regularly at the most efficient time.
|
|
DidReachEndOfBuffer Push(T&& aT) {
|
|
return PushF([&aT](T& aElement, Index) { aElement = std::move(aT); });
|
|
}
|
|
|
|
// Pop all elements before the first invalid one, running aF on each of them
|
|
// in FIFO order.
|
|
// Thread-safety with other PopAll calls is controlled by the `Locking`
|
|
// template argument.
|
|
// Concurrent pushes are always allowed, because:
|
|
// - PopAll won't read elements until valid,
|
|
// - Pushes do not interfere with pop-related members -- except for
|
|
// mReusableBuffers, which is accessed atomically.
|
|
template <typename F>
|
|
void PopAll(F&& aF) {
|
|
mReaderLocking.Lock();
|
|
// Destroy every second fully-read buffer.
|
|
// TODO: Research a better algorithm, probably based on stats.
|
|
bool destroy = false;
|
|
for (;;) {
|
|
Buffer* b = mOldestBuffer;
|
|
MOZ_ASSERT(!b->Older());
|
|
// The next element to pop must be in that oldest buffer.
|
|
MOZ_ASSERT(mNextElementToPop >= b->Origin());
|
|
MOZ_ASSERT(mNextElementToPop < b->Origin() + BufferSize);
|
|
|
|
// Start reading each element.
|
|
if (!b->ReadAndInvalidateAll(aF, mNextElementToPop)) {
|
|
// Found an invalid element, stop popping.
|
|
mReaderLocking.Unlock();
|
|
return;
|
|
}
|
|
|
|
// Reached the end of this oldest buffer
|
|
MOZ_ASSERT(mNextElementToPop == b->Origin() + BufferSize);
|
|
// Delete this oldest buffer.
|
|
// Since the last element was valid, it must mean that there is a newer
|
|
// buffer.
|
|
MOZ_ASSERT(b->Newer());
|
|
MOZ_ASSERT(mNextElementToPop == b->Newer()->Origin());
|
|
StopUsing(b, destroy);
|
|
destroy = !destroy;
|
|
|
|
// We will loop and start reading the now-oldest buffer.
|
|
}
|
|
}
|
|
|
|
// Size of all buffers (used, or recyclable), excluding external data.
|
|
size_t ShallowSizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const {
|
|
return mAllocatedBuffersStats.Count() * sizeof(Buffer);
|
|
}
|
|
|
|
struct CountAndWatermark {
|
|
int mCount;
|
|
int mWatermark;
|
|
};
|
|
|
|
CountAndWatermark LiveBuffersStats() const { return mLiveBuffersStats.Get(); }
|
|
CountAndWatermark ReusableBuffersStats() const {
|
|
return mReusableBuffersStats.Get();
|
|
}
|
|
CountAndWatermark AllocatedBuffersStats() const {
|
|
return mAllocatedBuffersStats.Get();
|
|
}
|
|
|
|
private:
|
|
// Structure containing the element to be stored, and a validity-marker.
|
|
class BufferedElement {
|
|
public:
|
|
// Run aF on an invalid element, and mark it as valid.
|
|
template <typename F>
|
|
void SetAndValidate(F&& aF, Index aIndex) {
|
|
MOZ_ASSERT(!mValid);
|
|
aF(mT, aIndex);
|
|
mValid = true;
|
|
}
|
|
|
|
// Run aF on a valid element and mark it as invalid, return true.
|
|
// Return false if element was invalid.
|
|
template <typename F>
|
|
bool ReadAndInvalidate(F&& aF) {
|
|
if (!mValid) {
|
|
return false;
|
|
}
|
|
aF(mT);
|
|
mValid = false;
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
T mT;
|
|
// mValid should be atomically changed to true *after* mT has been written,
|
|
// so that the reader can only see valid data.
|
|
// ReleaseAcquire, because when set to `true`, we want the just-written mT
|
|
// to be visible to the thread reading this `true`; and when set to `false`,
|
|
// we want the previous reads to have completed.
|
|
Atomic<bool, ReleaseAcquire> mValid{false};
|
|
};
|
|
|
|
// Buffer contains a sequence of BufferedElements starting at a specific
|
|
// index, and it points to the next-older buffer (if any).
|
|
class Buffer {
|
|
public:
|
|
// Constructor of the very first buffer.
|
|
Buffer() : mOlder(nullptr), mNewer(nullptr), mOrigin(0) {}
|
|
|
|
// Constructor of later buffers.
|
|
Buffer(Buffer* aOlder, Index aOrigin)
|
|
: mOlder(aOlder), mNewer(nullptr), mOrigin(aOrigin) {
|
|
MOZ_ASSERT(aOlder);
|
|
aOlder->mNewer = this;
|
|
}
|
|
|
|
Buffer* Older() const { return mOlder; }
|
|
void SetOlder(Buffer* aOlder) { mOlder = aOlder; }
|
|
|
|
Buffer* Newer() const { return mNewer; }
|
|
void SetNewer(Buffer* aNewer) { mNewer = aNewer; }
|
|
|
|
Index Origin() const { return mOrigin; }
|
|
void SetOrigin(Index aOrigin) { mOrigin = aOrigin; }
|
|
|
|
// Run aF on a yet-invalid element.
|
|
// Not thread-safe by itself, but nothing else should write this element,
|
|
// and reader won't access it until after it becomes valid.
|
|
template <typename F>
|
|
void SetAndValidateElement(F&& aF, Index aIndex) {
|
|
MOZ_ASSERT(aIndex >= Origin());
|
|
MOZ_ASSERT(aIndex < Origin() + BufferSize);
|
|
mElements[aIndex - Origin()].SetAndValidate(aF, aIndex);
|
|
}
|
|
|
|
using DidReadLastElement = bool;
|
|
|
|
// Read all valid elements starting at aIndex, marking them invalid and
|
|
// updating aIndex.
|
|
// Returns true if we ended up reading the last element in this buffer.
|
|
// Accessing the validity bit is thread-safe (as it's atomic), but once
|
|
// an element is valid, the reading itself is not thread-safe and should be
|
|
// guarded.
|
|
template <typename F>
|
|
DidReadLastElement ReadAndInvalidateAll(F&& aF, Index& aIndex) {
|
|
MOZ_ASSERT(aIndex >= Origin());
|
|
MOZ_ASSERT(aIndex < Origin() + BufferSize);
|
|
for (; aIndex < Origin() + BufferSize; ++aIndex) {
|
|
if (!mElements[aIndex - Origin()].ReadAndInvalidate(aF)) {
|
|
// Found an invalid element, stop here. (aIndex will not be updated
|
|
// past it, so we will start from here next time.)
|
|
return false;
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
private:
|
|
Buffer* mOlder;
|
|
Buffer* mNewer;
|
|
Index mOrigin;
|
|
BufferedElement mElements[BufferSize];
|
|
};
|
|
|
|
// Reuse a buffer, or create a new one.
|
|
// All buffered elements will be invalid.
|
|
Buffer* NewBuffer(Buffer* aOlder, Index aOrigin) {
|
|
MOZ_ASSERT(aOlder);
|
|
for (;;) {
|
|
Buffer* head = mReusableBuffers;
|
|
if (!head) {
|
|
++mAllocatedBuffersStats;
|
|
++mLiveBuffersStats;
|
|
Buffer* buffer = new Buffer(aOlder, aOrigin);
|
|
return buffer;
|
|
}
|
|
Buffer* older = head->Older();
|
|
// Try to pivot the reusable-buffer pointer from the current head to the
|
|
// next buffer in line.
|
|
if (mReusableBuffers.compareExchange(head, older)) {
|
|
// Success! The reusable-buffer pointer now points at the older buffer,
|
|
// so we can recycle this ex-head.
|
|
--mReusableBuffersStats;
|
|
++mLiveBuffersStats;
|
|
head->SetOlder(aOlder);
|
|
aOlder->SetNewer(head);
|
|
// We will be the newest; newer-pointer should already be null.
|
|
MOZ_ASSERT(!head->Newer());
|
|
head->SetOrigin(aOrigin);
|
|
return head;
|
|
}
|
|
// Failure, someone else must have touched the list, loop to try again.
|
|
}
|
|
}
|
|
|
|
// Discard a fully-read buffer.
|
|
// If aDestroy is true, delete it.
|
|
// If aDestroy is false, move the buffer to a reusable-buffer stack.
|
|
void StopUsing(Buffer* aBuffer, bool aDestroy) {
|
|
--mLiveBuffersStats;
|
|
|
|
// We should only stop using the oldest buffer.
|
|
MOZ_ASSERT(!aBuffer->Older());
|
|
// The newest buffer should not be modified here.
|
|
MOZ_ASSERT(aBuffer->Newer());
|
|
MOZ_ASSERT(aBuffer->Newer()->Older() == aBuffer);
|
|
// Detach from the second-oldest buffer.
|
|
aBuffer->Newer()->SetOlder(nullptr);
|
|
// Make the second-oldest buffer the now-oldest buffer.
|
|
mOldestBuffer = aBuffer->Newer();
|
|
|
|
if (aDestroy) {
|
|
--mAllocatedBuffersStats;
|
|
delete aBuffer;
|
|
} else {
|
|
++mReusableBuffersStats;
|
|
// The recycling stack only uses mOlder; mNewer is not needed.
|
|
aBuffer->SetNewer(nullptr);
|
|
|
|
// Make the given buffer the new head of reusable buffers.
|
|
for (;;) {
|
|
Buffer* head = mReusableBuffers;
|
|
aBuffer->SetOlder(head);
|
|
if (mReusableBuffers.compareExchange(head, aBuffer)) {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Index of the next element to write. Modified when an element index is
|
|
// claimed for a push. If the last element of a buffer is claimed, that push
|
|
// will be responsible for adding a new head buffer.
|
|
// Relaxed, because there is no synchronization based on this variable, each
|
|
// thread just needs to get a different value, and will then write different
|
|
// things (which themselves have some atomic validation before they may be
|
|
// read elsewhere, independent of this `mNextElementToWrite`.)
|
|
Atomic<Index::ValueType, Relaxed> mNextElementToWrite{0};
|
|
|
|
// Index that a live recent buffer reaches. If a push claims a lesser-or-
|
|
// equal number, the corresponding buffer is guaranteed to still be alive:
|
|
// - It will have been created before this index was updated,
|
|
// - It will not be destroyed until all its values have been written,
|
|
// including the one that just claimed a position within it.
|
|
// Also, the push that claims this exact number is responsible for adding the
|
|
// next buffer and updating this value accordingly.
|
|
// ReleaseAcquire, because when set to a certain value, the just-created
|
|
// buffer covering the new range must be visible to readers.
|
|
Atomic<Index::ValueType, ReleaseAcquire> mBuffersCoverAtLeastUpTo;
|
|
|
|
// Pointer to the most recent buffer. Never null.
|
|
// This is the most recent of a deque of yet-unread buffers.
|
|
// Only modified when adding a new head buffer.
|
|
// ReleaseAcquire, because when modified, the just-created new buffer must be
|
|
// visible to readers.
|
|
Atomic<Buffer*, ReleaseAcquire> mMostRecentBuffer;
|
|
|
|
// Stack of reusable buffers.
|
|
// ReleaseAcquire, because when modified, the just-added buffer must be
|
|
// visible to readers.
|
|
Atomic<Buffer*, ReleaseAcquire> mReusableBuffers;
|
|
|
|
// Template-provided locking mechanism to protect PopAll()-only member
|
|
// variables below.
|
|
ReaderLocking mReaderLocking;
|
|
|
|
// Pointer to the oldest buffer, which contains the new element to be popped.
|
|
// Never null.
|
|
Buffer* mOldestBuffer;
|
|
|
|
// Index of the next element to be popped.
|
|
Index mNextElementToPop{0};
|
|
|
|
// Stats.
|
|
class AtomicCountAndWatermark {
|
|
public:
|
|
explicit AtomicCountAndWatermark(int aCount)
|
|
: mCount(aCount), mWatermark(aCount) {}
|
|
|
|
int Count() const { return int(mCount); }
|
|
|
|
CountAndWatermark Get() const {
|
|
return CountAndWatermark{int(mCount), int(mWatermark)};
|
|
}
|
|
|
|
int operator++() {
|
|
int count = int(++mCount);
|
|
// Update watermark.
|
|
for (;;) {
|
|
int watermark = int(mWatermark);
|
|
if (watermark >= count) {
|
|
// printf("++[%p] -=> %d-%d\n", this, count, watermark);
|
|
break;
|
|
}
|
|
if (mWatermark.compareExchange(watermark, count)) {
|
|
// printf("++[%p] -x> %d-(was %d now %d)\n", this, count, watermark,
|
|
// count);
|
|
break;
|
|
}
|
|
}
|
|
return count;
|
|
}
|
|
|
|
int operator--() {
|
|
int count = int(--mCount);
|
|
// printf("--[%p] -> %d\n", this, count);
|
|
return count;
|
|
}
|
|
|
|
private:
|
|
// Relaxed, as these are just gathering stats, so consistency is not
|
|
// critical.
|
|
Atomic<int, Relaxed> mCount;
|
|
Atomic<int, Relaxed> mWatermark;
|
|
};
|
|
// All buffers in the mMostRecentBuffer deque.
|
|
AtomicCountAndWatermark mLiveBuffersStats;
|
|
// All buffers in the mReusableBuffers stack.
|
|
AtomicCountAndWatermark mReusableBuffersStats;
|
|
// All allocated buffers (sum of above).
|
|
AtomicCountAndWatermark mAllocatedBuffersStats;
|
|
};
|
|
|
|
} // namespace mozilla
|
|
|
|
#endif // mozilla_MultiWriterQueue_h_
|