зеркало из https://github.com/mozilla/cubeb.git
Add a triple_buffer class that allows publishing data in a wait-free manner from a real-time thread to another thread.
It's largely inspired from the crate cubeb-coreaudio-rs uses, but ported to C++.
This commit is contained in:
Родитель
e4da2d47c3
Коммит
304ef97245
|
@ -400,6 +400,7 @@ if(BUILD_TESTS)
|
|||
|
||||
cubeb_add_test(duplex)
|
||||
cubeb_add_test(logging)
|
||||
cubeb_add_test(triple_buffer)
|
||||
|
||||
if (USE_WASAPI)
|
||||
cubeb_add_test(overload_callback)
|
||||
|
|
|
@ -0,0 +1,80 @@
|
|||
/*
|
||||
* Copyright © 2022 Mozilla Foundation
|
||||
*
|
||||
* This program is made available under an ISC-style license. See the
|
||||
* accompanying file LICENSE for details.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Adapted and ported to C++ from https://crates.io/crates/triple_buffer
|
||||
*/
|
||||
|
||||
#ifndef CUBEB_TRIPLE_BUFFER
|
||||
#define CUBEB_TRIPLE_BUFFER
|
||||
|
||||
#include <atomic>
|
||||
|
||||
// Single producer / single consumer wait-free triple buffering
|
||||
// implementation, for when a producer wants to publish data to a consumer
|
||||
// without blocking, but when a queue is wastefull, because it's OK for the
|
||||
// consumer to miss data updates.
|
||||
template <typename T> class triple_buffer {
|
||||
public:
|
||||
// Write a new value into the triple buffer. Returns true if a value was
|
||||
// overwritten.
|
||||
// Producer-side only.
|
||||
bool write(T & input)
|
||||
{
|
||||
storage[input_idx] = input;
|
||||
return publish();
|
||||
}
|
||||
// Get the latest value from the triple buffer.
|
||||
// Consumer-side only.
|
||||
T & read()
|
||||
{
|
||||
update();
|
||||
return storage[output_idx];
|
||||
}
|
||||
// Returns true if a new value has been published by the consumer without
|
||||
// having been consumed yet.
|
||||
// Consumer-side only.
|
||||
bool updated()
|
||||
{
|
||||
return (shared_state.load(std::memory_order_relaxed) & BACK_DIRTY_BIT) != 0;
|
||||
}
|
||||
|
||||
private:
|
||||
// Publish a value to the consumer. Returns true if the data was overwritten
|
||||
// without having been read.
|
||||
bool publish()
|
||||
{
|
||||
auto former_back_idx = shared_state.exchange(input_idx | BACK_DIRTY_BIT,
|
||||
std::memory_order_acq_rel);
|
||||
input_idx = former_back_idx & BACK_INDEX_MASK;
|
||||
return (former_back_idx & BACK_DIRTY_BIT) != 0;
|
||||
}
|
||||
// Get a new value from the producer, if a new value has been produced.
|
||||
bool update()
|
||||
{
|
||||
bool was_updated = updated();
|
||||
if (was_updated) {
|
||||
auto former_back_idx =
|
||||
shared_state.exchange(output_idx, std::memory_order_acq_rel);
|
||||
output_idx = former_back_idx & BACK_INDEX_MASK;
|
||||
}
|
||||
return was_updated;
|
||||
}
|
||||
T storage[3];
|
||||
// Mask used to extract back-buffer index
|
||||
const uint8_t BACK_INDEX_MASK = 0b11;
|
||||
// Bit set by producer to signal updates
|
||||
const uint8_t BACK_DIRTY_BIT = 0b100;
|
||||
// Shared state: a dirty bit, and an index.
|
||||
std::atomic<uint8_t> shared_state = {0};
|
||||
// Output index, private to the consumer.
|
||||
uint8_t output_idx = 1;
|
||||
// Input index, private to the producer.
|
||||
uint8_t input_idx = 2;
|
||||
};
|
||||
|
||||
#endif // CUBEB_TRIPLE_BUFFER
|
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Copyright © 2022 Mozilla Foundation
|
||||
*
|
||||
* This program is made available under an ISC-style license. See the
|
||||
* accompanying file LICENSE for details.
|
||||
*/
|
||||
|
||||
/* cubeb_triple_buffer test */
|
||||
#include "gtest/gtest.h"
|
||||
#if !defined(_XOPEN_SOURCE)
|
||||
#define _XOPEN_SOURCE 600
|
||||
#endif
|
||||
#include "cubeb/cubeb.h"
|
||||
#include "cubeb_triple_buffer.h"
|
||||
#include <atomic>
|
||||
#include <math.h>
|
||||
#include <memory>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <thread>
|
||||
|
||||
#include "common.h"
|
||||
|
||||
TEST(cubeb, triple_buffer)
|
||||
{
|
||||
struct AB {
|
||||
uint64_t a;
|
||||
uint64_t b;
|
||||
};
|
||||
triple_buffer<AB> buffer;
|
||||
|
||||
std::atomic<bool> finished = {false};
|
||||
|
||||
ASSERT_TRUE(!buffer.updated());
|
||||
|
||||
auto t = std::thread([&finished, &buffer] {
|
||||
AB ab;
|
||||
ab.a = 0;
|
||||
ab.b = UINT64_MAX;
|
||||
uint64_t counter = 0;
|
||||
do {
|
||||
buffer.write(ab);
|
||||
ab.a++;
|
||||
ab.b--;
|
||||
} while (counter++ < 1e6 && ab.a <= UINT64_MAX && ab.b != 0);
|
||||
finished.store(true);
|
||||
});
|
||||
|
||||
AB ab;
|
||||
AB old_ab;
|
||||
old_ab.a = 0;
|
||||
old_ab.b = UINT64_MAX;
|
||||
|
||||
// Wait to have at least one value produced.
|
||||
while (!buffer.updated()) {
|
||||
}
|
||||
|
||||
// Check that the values are increasing (resp. descreasing) monotonically.
|
||||
while (!finished) {
|
||||
ab = buffer.read();
|
||||
ASSERT_GE(ab.a, old_ab.a);
|
||||
ASSERT_LE(ab.b, old_ab.b);
|
||||
old_ab = ab;
|
||||
}
|
||||
|
||||
t.join();
|
||||
}
|
Загрузка…
Ссылка в новой задаче