Implement a lock-free single consumer single producer queue.

This commit is contained in:
Paul Adenot 2016-12-01 14:55:10 +13:00 коммит произвёл Matthew Gregan
Родитель 9770b85bf7
Коммит 4aa67a9a9a
4 изменённых файлов: 802 добавлений и 0 удалений

Просмотреть файл

@ -204,3 +204,9 @@ target_include_directories(test_utils PRIVATE ${gtest_SOURCE_DIR}/include)
target_include_directories(test_utils PRIVATE src)
target_link_libraries(test_utils PRIVATE cubeb gtest_main)
add_test(utils test_utils)
add_executable(test_ring_buffer test/test_ring_buffer.cpp)
target_include_directories(test_ring_buffer PRIVATE ${gtest_SOURCE_DIR}/include)
target_include_directories(test_ring_buffer PRIVATE src)
target_link_libraries(test_ring_buffer PRIVATE cubeb gtest_main)
add_test(ring_buffer test_ring_buffer)

532
src/cubeb_ringbuffer.h Normal file
Просмотреть файл

@ -0,0 +1,532 @@
/*
* Copyright © 2016 Mozilla Foundation
*
* This program is made available under an ISC-style license. See the
* accompanying file LICENSE for details.
*/
#ifndef CUBEB_RING_BUFFER_H
#define CUBEB_RING_BUFFER_H
#include "cubeb_utils.h"
#include <algorithm>
#include <atomic>
#include <cstdint>
#include <memory>
#include <thread>
/* This enum allows choosing the behaviour of the queue. */
enum ThreadSafety
{
/* No attempt to synchronize the queue is made. The queue is only safe when
* used on a single thread. */
Unsafe,
/** Atomics are used to synchronize read and write. The queue is safe when
* used from two thread: one producer, one consumer. */
Safe
};
/** Policy to enable thread safety on the queue. */
template<ThreadSafety>
struct ThreadSafePolicy;
typedef int RingBufferIndex;
/** Policy for thread-safe internal index for the queue. */
template<>
struct ThreadSafePolicy<Safe>
{
typedef std::atomic<RingBufferIndex> IndexType;
};
/**
* This is the version with a simple `int` for index, for use when only a single
* thread is producing and releasing data.
*/
template<>
struct ThreadSafePolicy<Unsafe>
{
typedef RingBufferIndex IndexType;
};
/**
* Single producer single consumer lock-free and wait-free ring buffer.
*
* This data structure allows producing data from one thread, and consuming it on
* another thread, safely and without explicit synchronization. If used on two
* threads, this data structure uses atomics for thread safety. It is possible
* to disable the use of atomics at compile time and only use this data
* structure on one thread.
*
* The role for the producer and the consumer must be constant, i.e., the
* producer should always be on one thread and the consumer should always be on
* another thread.
*
* Some words about the inner workings of this class:
* - Capacity is fixed. Only one allocation is performed, in the constructor.
* When reading and writing, the return value of the method allows checking if
* the ring buffer is empty or full.
* - We always keep the read index at least one element ahead of the write
* index, so we can distinguish between an empty and a full ring buffer: an
* empty ring buffer is when the write index is at the same position as the
* read index. A full buffer is when the write index is exactly one position
* before the read index.
* - We synchronize updates to the read index after having read the data, and
* the write index after having written the data. This means that the each
* thread can only touch a portion of the buffer that is not touched by the
* other thread.
* - Callers are expected to provide buffers. When writing to the queue,
* elements are copied into the internal storage from the buffer passed in.
* When reading from the queue, the user is expected to provide a buffer.
* Because this is a ring buffer, data might not be contiguous in memory,
* providing an external buffer to copy into is an easy way to have linear
* data for further processing.
*/
template <typename T,
ThreadSafety Safety = ThreadSafety::Safe>
class ring_buffer_base
{
public:
/**
* Constructor for a ring buffer.
*
* This performs an allocation, but is the only allocation that will happen
* for the life time of a `ring_buffer_base`.
*
* @param capacity The maximum number of element this ring buffer will hold.
*/
ring_buffer_base(RingBufferIndex capacity)
/* One more element to distinguish from empty and full buffer. */
: capacity_(capacity + 1)
{
assert(storage_capacity() <
std::numeric_limits<RingBufferIndex>::max() / 2 &&
"buffer too large for the type of index used.");
assert(capacity_ > 0);
data_.reset(new T[storage_capacity()]);
/* If this queue is using atomics, initializing those members as the last
* action in the constructor acts as a full barrier, and allow capacity() to
* be thread-safe. */
write_index_ = 0;
read_index_ = 0;
}
/**
* Push `count` zero or default constructed elements in the array.
*
* Only safely called on the producer thread.
*
* @param count The number of elements to enqueue.
* @return The number of element enqueued.
*/
RingBufferIndex enqueue_default(RingBufferIndex count)
{
return enqueue(nullptr, count);
}
/**
* @brief Put an element in the queue
*
* Only safely called on the producer thread.
*
* @param element The element to put in the queue.
*
* @return 1 if the element was inserted, 0 otherwise.
*/
RingBufferIndex enqueue(T& element)
{
return enqueue(&element, 1);
}
/**
* Push `count` elements in the ring buffer.
*
* Only safely called on the producer thread.
*
* @param elements a pointer to a buffer containing at least `count` elements.
* If `elements` is nullptr, zero or default constructed elements are enqueued.
* @param count The number of elements to read from `elements`
* @return The number of elements successfully coped from `elements` and inserted
* into the ring buffer.
*/
RingBufferIndex enqueue(T * elements, RingBufferIndex count)
{
#ifndef NDEBUG
assert_correct_thread(producer_id);
#endif
RingBufferIndex rd_idx = read_index_;
RingBufferIndex wr_idx = write_index_;
if (full_internal(rd_idx, wr_idx)) {
return 0;
}
RingBufferIndex to_write =
std::min(available_write_internal(rd_idx, wr_idx), count);
/* First part, from the write index to the end of the array. */
RingBufferIndex first_part = std::min(storage_capacity() - wr_idx,
to_write);
/* Second part, from the beginning of the array */
RingBufferIndex second_part = to_write - first_part;
if (elements) {
Copy(data_.get() + wr_idx, elements, first_part);
Copy(data_.get(), elements + first_part, second_part);
} else {
ConstructDefault(data_.get() + wr_idx, first_part);
ConstructDefault(data_.get(), second_part);
}
write_index_ = increment_index(wr_idx, to_write);
return to_write;
}
/**
* Retrieve at most `count` elements from the ring buffer, and copy them to
* `elements`, if non-null.
*
* Only safely called on the consumer side.
*
* @param elements A pointer to a buffer with space for at least `count`
* elements. If `elements` is `nullptr`, `count` element will be discarded.
* @param count The maximum number of elements to dequeue.
* @return The number of elements written to `elements`.
*/
RingBufferIndex dequeue(T * elements, RingBufferIndex count)
{
#ifndef NDEBUG
assert_correct_thread(consumer_id);
#endif
RingBufferIndex wr_idx = write_index_;
RingBufferIndex rd_idx = read_index_;
if (empty_internal(rd_idx, wr_idx)) {
return 0;
}
RingBufferIndex to_read =
std::min(available_read_internal(rd_idx, wr_idx), count);
RingBufferIndex first_part = std::min(storage_capacity() - rd_idx, to_read);
RingBufferIndex second_part = to_read - first_part;
if (elements) {
Copy(elements, data_.get() + rd_idx, first_part);
Copy(elements + first_part, data_.get(), second_part);
}
read_index_ = increment_index(rd_idx, to_read);
return to_read;
}
/**
* Get the number of available element for consuming.
*
* Only safely called on the consumer thread.
*
* @return The number of available elements for reading.
*/
RingBufferIndex available_read() const
{
#ifndef NDEBUG
assert_correct_thread(consumer_id);
#endif
return available_read_internal(read_index_, write_index_);
}
/**
* Get the number of available elements for consuming.
*
* Only safely called on the producer thread.
*
* @return The number of empty slots in the buffer, available for writing.
*/
RingBufferIndex available_write() const
{
#ifndef NDEBUG
assert_correct_thread(producer_id);
#endif
return available_write_internal(read_index_, write_index_);
}
/**
* Get the total capacity, for this ring buffer.
*
* Can be called safely on any thread.
*
* @return The maximum capacity of this ring buffer.
*/
RingBufferIndex capacity() const
{
return storage_capacity() - 1;
}
private:
/** Return true if the ring buffer is empty.
*
* @param read_index the read index to consider
* @param write_index the write index to consider
* @return true if the ring buffer is empty, false otherwise.
**/
bool empty_internal(RingBufferIndex read_index,
RingBufferIndex write_index) const
{
return write_index == read_index;
}
/** Return true if the ring buffer is full.
*
* This happens if the write index is exactly one element behind the read
* index.
*
* @param read_index the read index to consider
* @param write_index the write index to consider
* @return true if the ring buffer is full, false otherwise.
**/
bool full_internal(RingBufferIndex read_index,
RingBufferIndex write_index) const
{
return (write_index + 1) % storage_capacity() == read_index;
}
/**
* Return the size of the storage. It is one more than the number of elements
* that can be stored in the buffer.
*
* @return the number of elements that can be stored in the buffer.
*/
int storage_capacity() const
{
return capacity_;
}
/**
* Returns the number of elements available for reading.
*
* @return the number of available elements for reading.
*/
RingBufferIndex
available_read_internal(RingBufferIndex read_index,
RingBufferIndex write_index) const
{
if (write_index >= read_index) {
return write_index - read_index;
} else {
return write_index + storage_capacity() - read_index;
}
}
/**
* Returns the number of empty elements, available for writing.
*
* @return the number of elements that can be written into the array.
*/
RingBufferIndex
available_write_internal(RingBufferIndex read_index,
RingBufferIndex write_index) const
{
/* We substract one element here to always keep at least one sample
* free in the buffer, to distinguish between full and empty array. */
int rv = read_index - write_index - 1;
if (write_index >= read_index) {
rv += storage_capacity();
}
return rv;
}
/**
* Increments an index, wrapping it around the storage.
*
* @param index a reference to the index to increment.
* @param increment the number by which `index` is incremented.
* @return the new index.
*/
RingBufferIndex
increment_index(RingBufferIndex index, RingBufferIndex increment) const
{
assert(increment >= 0);
return (index + increment) % storage_capacity();
}
/**
* @brief This allows checking that enqueue (resp. dequeue) are always called
* by the right thread.
*
* @param id the id of the thread that has called the calling method first.
*/
#ifndef NDEBUG
static void assert_correct_thread(std::thread::id& id)
{
if (id == std::thread::id()) {
id = std::this_thread::get_id();
return;
}
assert(id == std::this_thread::get_id());
}
#endif
/** Index at which the oldest element is at, in samples. */
typename ThreadSafePolicy<Safety>::IndexType read_index_;
/** Index at which to write new elements. `write_index` is always at
* least one element ahead of `read_index_`. */
typename ThreadSafePolicy<Safety>::IndexType write_index_;
/** Maximum number of elements that can be stored in the ring buffer. */
const int capacity_;
/** Data storage */
std::unique_ptr<T[]> data_;
#ifndef NDEBUG
/** The id of the only thread that is allowed to read from the queue. */
mutable std::thread::id consumer_id;
/** The id of the only thread that is allowed to write from the queue. */
mutable std::thread::id producer_id;
#endif
};
/**
* Adapter for `ring_buffer_base` that exposes an interface in frames.
*/
template <typename T,
ThreadSafety Safety = ThreadSafety::Safe>
class audio_ring_buffer_base
{
public:
/**
* @brief Constructor.
*
* @param channel_count Number of channels.
* @param capacity_in_frames The capacity in frames.
*/
audio_ring_buffer_base(int channel_count, int capacity_in_frames)
: channel_count(channel_count)
, ring_buffer(frames_to_samples(capacity_in_frames))
{
assert(channel_count > 0);
}
/**
* @brief Enqueue silence.
*
* Only safely called on the producer thread.
*
* @param frame_count The number of frames of silence to enqueue.
* @return The number of frames of silence actually written to the queue.
*/
int enqueue_default(int frame_count)
{
return samples_to_frames(ring_buffer.enqueue(nullptr, frames_to_samples(frame_count)));
}
/**
* @brief Enqueue `frames_count` frames of audio.
*
* Only safely called from the producer thread.
*
* @param [in] frames If non-null, the frames to enqueue.
* Otherwise, silent frames are enqueued.
* @param frame_count The number of frames to enqueue.
*
* @return The number of frames enqueued
*/
int enqueue(T * frames, int frame_count)
{
return samples_to_frames(ring_buffer.enqueue(frames, frames_to_samples(frame_count)));
}
/**
* @brief Removes `frame_count` frames from the buffer, and
* write them to `frames` if it is non-null.
*
* Only safely called on the consumer thread.
*
* @param frames If non-null, the frames are copied to `frames`.
* Otherwise, they are dropped.
* @param frame_count The number of frames to remove.
*
* @return The number of frames actually dequeud.
*/
int dequeue(T * frames, int frame_count)
{
return samples_to_frames(ring_buffer.dequeue(frames, frames_to_samples(frame_count)));
}
/**
* Get the number of available frames of audio for consuming.
*
* Only safely called on the consumer thread.
*
* @return The number of available frames of audio for reading.
*/
int available_read() const
{
return samples_to_frames(ring_buffer.available_read());
}
/**
* Get the number of available frames of audio for consuming.
*
* Only safely called on the producer thread.
*
* @return The number of empty slots in the buffer, available for writing.
*/
int available_write() const
{
return samples_to_frames(ring_buffer.available_write());
}
/**
* Get the total capacity, for this ring buffer.
*
* Can be called safely on any thread.
*
* @return The maximum capacity of this ring buffer.
*/
int capacity() const
{
return samples_to_frames(ring_buffer.capacity());
}
private:
/**
* @brief Frames to samples conversion.
*
* @param frames The number of frames.
*
* @return A number of samples.
*/
int frames_to_samples(int frames) const
{
return frames * channel_count;
}
/**
* @brief Samples to frames conversion.
*
* @param samples The number of samples.
*
* @return A number of frames.
*/
int samples_to_frames(int samples) const
{
return samples / channel_count;
}
/** Number of channels of audio that will stream through this ring buffer. */
int channel_count;
/** The underlying ring buffer that is used to store the data. */
ring_buffer_base<T, Safety> ring_buffer;
};
/**
* Lock-free instantiation of the `ring_buffer_base` type. This is safe to use
* from two threads, one producer, one consumer (that never change role),
* without explicit synchronization.
*/
template<typename T>
using lock_free_queue = ring_buffer_base<T, Safe>;
/**
* An instantiation of the `ring_buffer_base` type, to be used on a single
* thread: it is not safe to use from multiple threads without explicit external
* synchronization.
*/
template<typename T>
using queue = ring_buffer_base<T, Unsafe>;
/**
* Lock-free instantiation of the `audio_ring_buffer` type. This is safe to use
* from two threads, one producer, one consumer (that never change role),
* without explicit synchronization.
*/
template<typename T>
using lock_free_audio_ring_buffer = audio_ring_buffer_base<T, Safe>;
/**
* An instantiation of the `audio_ring_buffer` type, to be used on a single
* thread: it is not safe to use from multiple threads without explicit external
* synchronization.
*/
template<typename T>
using audio_ring_buffer = audio_ring_buffer_base<T, Unsafe>;
#endif // CUBEB_RING_BUFFER_H

Просмотреть файл

@ -46,6 +46,63 @@ void PodZero(T * destination, size_t count)
memset(destination, 0, count * sizeof(T));
}
namespace {
template<typename T, typename Trait>
void Copy(T * destination, const T * source, size_t count, Trait)
{
for (size_t i = 0; i < count; i++) {
destination[i] = source[i];
}
}
template<typename T>
void Copy(T * destination, const T * source, size_t count, std::true_type)
{
PodCopy(destination, source, count);
}
}
/**
* This allows copying a number of elements from a `source` pointer to a
* `destination` pointer, using `memcpy` if it is safe to do so, or a loop that
* calls the constructors and destructors otherwise.
*/
template<typename T>
void Copy(T * destination, const T * source, size_t count)
{
assert(destination && source);
Copy(destination, source, count, typename std::is_trivial<T>::type());
}
namespace {
template<typename T, typename Trait>
void ConstructDefault(T * destination, size_t count, Trait)
{
for (size_t i = 0; i < count; i++) {
destination[i] = T();
}
}
template<typename T>
void ConstructDefault(T * destination,
size_t count, std::true_type)
{
PodZero(destination, count);
}
}
/**
* This allows zeroing (using memset) or default-constructing a number of
* elements calling the constructors and destructors if necessary.
*/
template<typename T>
void ConstructDefault(T * destination, size_t count)
{
assert(destination);
ConstructDefault(destination, count,
typename std::is_arithmetic<T>::type());
}
template<typename T>
class auto_array
{

207
test/test_ring_buffer.cpp Normal file
Просмотреть файл

@ -0,0 +1,207 @@
/*
* Copyright © 2016 Mozilla Foundation
*
* This program is made available under an ISC-style license. See the
* accompanying file LICENSE for details.
*/
#define NOMINMAX
#include "gtest/gtest.h"
#include "cubeb_ringbuffer.h"
#include <iostream>
#include <thread>
#include <chrono>
/* Generate a monotonically increasing sequence of numbers. */
template<typename T>
class sequence_generator
{
public:
sequence_generator(size_t channels)
: channels(channels)
{ }
void get(T * elements, size_t frames)
{
for (size_t i = 0; i < frames; i++) {
for (size_t c = 0; c < channels; c++) {
elements[i * channels + c] = static_cast<T>(index_);
}
index_++;
}
}
void rewind(size_t frames)
{
index_ -= frames;
}
private:
size_t index_ = 0;
size_t channels = 0;
};
/* Checks that a sequence is monotonically increasing. */
template<typename T>
class sequence_verifier
{
public:
sequence_verifier(size_t channels)
: channels(channels)
{ }
void check(T * elements, size_t frames)
{
for (size_t i = 0; i < frames; i++) {
for (size_t c = 0; c < channels; c++) {
if (elements[i * channels + c] != static_cast<T>(index_)) {
std::cerr << "Element " << i << " is different. Expected "
<< static_cast<T>(index_) << ", got " << elements[i]
<< ". (channel count: " << channels << ")." << std::endl;
ASSERT_TRUE(false);
}
}
index_++;
}
}
private:
size_t index_ = 0;
size_t channels = 0;
};
template<typename T>
void test_ring(audio_ring_buffer<T>& buf, int channels, int capacity_frames)
{
std::unique_ptr<T[]> seq(new T[capacity_frames * channels]);
sequence_generator<T> gen(channels);
sequence_verifier<T> checker(channels);
int iterations = 1002;
const int block_size = 128;
while(iterations--) {
gen.get(seq.get(), block_size);
int rv = buf.enqueue(seq.get(), block_size);
ASSERT_EQ(rv, block_size);
PodZero(seq.get(), block_size);
rv = buf.dequeue(seq.get(), block_size);
ASSERT_EQ(rv, block_size);
checker.check(seq.get(), block_size);
}
}
template<typename T>
void test_ring_multi(lock_free_audio_ring_buffer<T>& buf, int channels, int capacity_frames)
{
sequence_verifier<T> checker(channels);
std::unique_ptr<T[]> out_buffer(new T[capacity_frames * channels]);
const int block_size = 128;
std::thread t([&buf, capacity_frames, channels, block_size] {
int iterations = 1002;
std::unique_ptr<T[]> in_buffer(new T[capacity_frames * channels]);
sequence_generator<T> gen(channels);
while(iterations--) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
gen.get(in_buffer.get(), block_size);
int rv = buf.enqueue(in_buffer.get(), block_size);
ASSERT_TRUE(rv <= block_size);
if (rv != block_size) {
gen.rewind(block_size - rv);
}
}
});
int remaining = 1002;
while(remaining--) {
std::this_thread::sleep_for(std::chrono::microseconds(10));
int rv = buf.dequeue(out_buffer.get(), block_size);
ASSERT_TRUE(rv <= block_size);
checker.check(out_buffer.get(), rv);
}
t.join();
}
template<typename T>
void basic_api_test(T& ring)
{
ASSERT_EQ(ring.capacity(), 128);
ASSERT_EQ(ring.available_read(), 0);
ASSERT_EQ(ring.available_write(), 128);
int rv = ring.enqueue_default(63);
ASSERT_TRUE(rv == 63);
ASSERT_EQ(ring.available_read(), 63);
ASSERT_EQ(ring.available_write(), 65);
rv = ring.enqueue_default(65);
ASSERT_EQ(rv, 65);
ASSERT_EQ(ring.available_read(), 128);
ASSERT_EQ(ring.available_write(), 0);
rv = ring.dequeue(nullptr, 63);
ASSERT_EQ(ring.available_read(), 65);
ASSERT_EQ(ring.available_write(), 63);
rv = ring.dequeue(nullptr, 65);
ASSERT_EQ(ring.available_read(), 0);
ASSERT_EQ(ring.available_write(), 128);
}
TEST(cubeb, ring_buffer)
{
/* Basic API test. */
const int min_channels = 1;
const int max_channels = 10;
const int min_capacity = 199;
const int max_capacity = 1277;
const int capacity_increment = 27;
queue<float> q1(128);
basic_api_test(q1);
queue<short> q2(128);
basic_api_test(q2);
lock_free_queue<float> q3(128);
basic_api_test(q3);
lock_free_queue<short> q4(128);
basic_api_test(q4);
for (size_t channels = min_channels; channels < max_channels; channels++) {
audio_ring_buffer<float> q5(channels, 128);
basic_api_test(q5);
audio_ring_buffer<short> q6(channels, 128);
basic_api_test(q6);
lock_free_audio_ring_buffer<float> q7(channels, 128);
basic_api_test(q7);
lock_free_audio_ring_buffer<short> q8(channels, 128);
basic_api_test(q8);
}
/* Single thread testing. */
/* Test mono to 9.1 */
for (size_t channels = min_channels; channels < max_channels; channels++) {
/* Use non power-of-two numbers to catch edge-cases. */
for (size_t capacity_frames = min_capacity;
capacity_frames < max_capacity; capacity_frames+=capacity_increment) {
audio_ring_buffer<float> ring(channels, capacity_frames);
test_ring(ring, channels, capacity_frames);
}
}
/* Multi thread testing */
for (size_t channels = max_channels; channels < min_channels; channels++) {
/* Use non power-of-two numbers to catch edge-cases. */
for (size_t capacity_frames = min_capacity;
capacity_frames < max_capacity; capacity_frames+=capacity_increment) {
lock_free_audio_ring_buffer<short> ring(channels, capacity_frames);
test_ring_multi(ring, channels, capacity_frames);
}
}
}