From 304ef97245244e98cf8e3324eb348d9c01717f59 Mon Sep 17 00:00:00 2001 From: Paul Adenot Date: Tue, 13 Dec 2022 19:30:46 +0100 Subject: [PATCH] Add a triple_buffer class that allows publishing data in a wait-free manner from a real-time thread to another thread. It's largely inspired from the crate cubeb-coreaudio-rs uses, but ported to C++. --- CMakeLists.txt | 1 + src/cubeb_triple_buffer.h | 80 +++++++++++++++++++++++++++++++++++++ test/test_triple_buffer.cpp | 67 +++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 src/cubeb_triple_buffer.h create mode 100644 test/test_triple_buffer.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e7de39f..11951cf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -400,6 +400,7 @@ if(BUILD_TESTS) cubeb_add_test(duplex) cubeb_add_test(logging) + cubeb_add_test(triple_buffer) if (USE_WASAPI) cubeb_add_test(overload_callback) diff --git a/src/cubeb_triple_buffer.h b/src/cubeb_triple_buffer.h new file mode 100644 index 0000000..a5a5978 --- /dev/null +++ b/src/cubeb_triple_buffer.h @@ -0,0 +1,80 @@ +/* + * Copyright © 2022 Mozilla Foundation + * + * This program is made available under an ISC-style license. See the + * accompanying file LICENSE for details. + */ + +/** + * Adapted and ported to C++ from https://crates.io/crates/triple_buffer + */ + +#ifndef CUBEB_TRIPLE_BUFFER +#define CUBEB_TRIPLE_BUFFER + +#include + +// Single producer / single consumer wait-free triple buffering +// implementation, for when a producer wants to publish data to a consumer +// without blocking, but when a queue is wastefull, because it's OK for the +// consumer to miss data updates. +template class triple_buffer { +public: + // Write a new value into the triple buffer. Returns true if a value was + // overwritten. + // Producer-side only. + bool write(T & input) + { + storage[input_idx] = input; + return publish(); + } + // Get the latest value from the triple buffer. + // Consumer-side only. + T & read() + { + update(); + return storage[output_idx]; + } + // Returns true if a new value has been published by the consumer without + // having been consumed yet. + // Consumer-side only. + bool updated() + { + return (shared_state.load(std::memory_order_relaxed) & BACK_DIRTY_BIT) != 0; + } + +private: + // Publish a value to the consumer. Returns true if the data was overwritten + // without having been read. + bool publish() + { + auto former_back_idx = shared_state.exchange(input_idx | BACK_DIRTY_BIT, + std::memory_order_acq_rel); + input_idx = former_back_idx & BACK_INDEX_MASK; + return (former_back_idx & BACK_DIRTY_BIT) != 0; + } + // Get a new value from the producer, if a new value has been produced. + bool update() + { + bool was_updated = updated(); + if (was_updated) { + auto former_back_idx = + shared_state.exchange(output_idx, std::memory_order_acq_rel); + output_idx = former_back_idx & BACK_INDEX_MASK; + } + return was_updated; + } + T storage[3]; + // Mask used to extract back-buffer index + const uint8_t BACK_INDEX_MASK = 0b11; + // Bit set by producer to signal updates + const uint8_t BACK_DIRTY_BIT = 0b100; + // Shared state: a dirty bit, and an index. + std::atomic shared_state = {0}; + // Output index, private to the consumer. + uint8_t output_idx = 1; + // Input index, private to the producer. + uint8_t input_idx = 2; +}; + +#endif // CUBEB_TRIPLE_BUFFER diff --git a/test/test_triple_buffer.cpp b/test/test_triple_buffer.cpp new file mode 100644 index 0000000..a6e0049 --- /dev/null +++ b/test/test_triple_buffer.cpp @@ -0,0 +1,67 @@ +/* + * Copyright © 2022 Mozilla Foundation + * + * This program is made available under an ISC-style license. See the + * accompanying file LICENSE for details. + */ + +/* cubeb_triple_buffer test */ +#include "gtest/gtest.h" +#if !defined(_XOPEN_SOURCE) +#define _XOPEN_SOURCE 600 +#endif +#include "cubeb/cubeb.h" +#include "cubeb_triple_buffer.h" +#include +#include +#include +#include +#include +#include + +#include "common.h" + +TEST(cubeb, triple_buffer) +{ + struct AB { + uint64_t a; + uint64_t b; + }; + triple_buffer buffer; + + std::atomic finished = {false}; + + ASSERT_TRUE(!buffer.updated()); + + auto t = std::thread([&finished, &buffer] { + AB ab; + ab.a = 0; + ab.b = UINT64_MAX; + uint64_t counter = 0; + do { + buffer.write(ab); + ab.a++; + ab.b--; + } while (counter++ < 1e6 && ab.a <= UINT64_MAX && ab.b != 0); + finished.store(true); + }); + + AB ab; + AB old_ab; + old_ab.a = 0; + old_ab.b = UINT64_MAX; + + // Wait to have at least one value produced. + while (!buffer.updated()) { + } + + // Check that the values are increasing (resp. descreasing) monotonically. + while (!finished) { + ab = buffer.read(); + ASSERT_GE(ab.a, old_ab.a); + ASSERT_LE(ab.b, old_ab.b); + old_ab = ab; + } + + t.join(); +}