AAudio: Use condition variable instead of sleeping

This removes the constant checking for state changes with
5ms of sleep in between for the state thread.
We need a new thread to wakeup the state thread reliably
without blocking in the audio thread. For a more detailed
and theoretical explanation of the problem and solution (specifically
written for this commit), see:
https://nyorain.github.io/lock-free-wakeup.html

Now, will only do this time-based sleeping when actively waiting
for a state change. We can't implement that with a blocking call to
AAudioStream_waitForStateChange since that can't be woken up
and we furthermore might have to wait for multiple streams at once.
This also fixes some issues and race conditions with stream destruction
and adds some more documentation.
This commit is contained in:
nyorain 2019-12-08 00:19:10 +01:00 коммит произвёл Paul Adenot
Родитель efae368bcf
Коммит 687fa39311
1 изменённых файлов: 225 добавлений и 52 удалений

Просмотреть файл

@ -103,6 +103,7 @@ struct cubeb_stream {
/**/
_Atomic bool in_use;
_Atomic enum stream_state state;
AAudioStream * ostream;
AAudioStream * istream;
@ -110,10 +111,12 @@ struct cubeb_stream {
cubeb_state_callback state_callback;
cubeb_resampler * resampler;
_Atomic enum stream_state state;
void * in_buf;
// mutex synchronizes access to the stream from the state thread
// and user-called functions. Everything that is accessed in the
// aaudio data (or error) callback is synchronized only via atomics.
pthread_mutex_t mutex;
void * in_buf;
unsigned in_frame_size;
cubeb_sample_format out_format;
@ -126,14 +129,22 @@ struct cubeb {
struct cubeb_ops const * ops;
void * libaaudio;
_Atomic bool state_join;
pthread_t state_thread;
struct {
// The state thread: it waits for state changes and stops
// drained streams.
pthread_t thread;
pthread_t notifier;
pthread_mutex_t mutex;
pthread_cond_t cond;
_Atomic bool join;
_Atomic bool waiting;
} state;
pthread_mutex_t mutex;
// streams[i].in_use signals whether a stream is used
struct cubeb_stream streams[MAX_STREAMS];
};
// Only allowed from state thread
// Only allowed from state thread, while mutex on stm is locked
static void shutdown(cubeb_stream* stm)
{
if (stm->istream) {
@ -147,15 +158,34 @@ static void shutdown(cubeb_stream* stm)
atomic_store(&stm->state, STREAM_STATE_SHUTDOWN);
}
// Returns whether the given state is one in which we wait for
// an asynchronous change
static bool waiting_state(enum stream_state state)
{
switch(state) {
case STREAM_STATE_DRAINING:
case STREAM_STATE_STARTING:
case STREAM_STATE_STOPPING:
return true;
default:
return false;
}
}
// Returns whether this stream is still waiting for a state change
static void update_state(cubeb_stream * stm)
{
if (!atomic_load(&stm->in_use) || atomic_load(&stm->state) == STREAM_STATE_INIT) {
// fast path for streams that don't wait for state change or are invalid
enum stream_state old_state = atomic_load(&stm->state);
if (old_state == STREAM_STATE_INIT ||
old_state == STREAM_STATE_STARTED ||
old_state == STREAM_STATE_STOPPED ||
old_state == STREAM_STATE_SHUTDOWN) {
return;
}
// Don't wait to stream operations in the main thread.
// If an operation on a stream takes a while, the state thread
// can still continue update the other streams.
// If the main thread currently operates on this thread, we don't
// have to wait for it
int err = pthread_mutex_trylock(&stm->mutex);
if (err != 0) {
if (err != EBUSY) {
@ -164,14 +194,22 @@ static void update_state(cubeb_stream * stm)
return;
}
aaudio_result_t res;
enum stream_state old_state = atomic_load(&stm->state);
enum stream_state new_state;
// check again: if this is true now, the stream was destroyed or
// changed between our fast path check and locking the mutex
old_state = atomic_load(&stm->state);
if (old_state == STREAM_STATE_INIT ||
old_state == STREAM_STATE_STARTED ||
old_state == STREAM_STATE_STOPPED ||
old_state == STREAM_STATE_SHUTDOWN) {
pthread_mutex_unlock(&stm->mutex);
return;
}
// We compute the new state the stream has and then compare_exchange it
// if it has changed. This way we will never just overwrite state
// changes that were set from the audio thread in the meantime,
// such as a draining or error state.
enum stream_state new_state;
do {
if (old_state == STREAM_STATE_SHUTDOWN) {
pthread_mutex_unlock(&stm->mutex);
@ -189,6 +227,7 @@ static void update_state(cubeb_stream * stm)
aaudio_stream_state_t istate = 0;
aaudio_stream_state_t ostate = 0;
aaudio_result_t res;
if (stm->istream) {
res = WRAP(AAudioStream_waitForStateChange)(stm->istream,
AAUDIO_STREAM_STATE_UNKNOWN, &istate, 0);
@ -239,9 +278,6 @@ static void update_state(cubeb_stream * stm)
}
switch (old_state) {
case STREAM_STATE_STOPPED:
case STREAM_STATE_STARTED:
break;
case STREAM_STATE_STARTING:
if ((!istate || istate == AAUDIO_STREAM_STATE_STARTED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_STARTED)) {
@ -311,23 +347,71 @@ static void update_state(cubeb_stream * stm)
pthread_mutex_unlock(&stm->mutex);
}
static void * notifier_thread(void * user_ptr)
{
cubeb * ctx = (cubeb*) user_ptr;
pthread_mutex_lock(&ctx->state.mutex);
while (!atomic_load(&ctx->state.join)) {
pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
if (atomic_load(&ctx->state.waiting)) {
pthread_cond_signal(&ctx->state.cond);
}
}
// make sure other thread joins as well
pthread_cond_signal(&ctx->state.cond);
pthread_mutex_unlock(&ctx->state.mutex);
LOG("Exiting notifier thread");
return NULL;
}
static void * state_thread(void * user_ptr)
{
cubeb * ctx = (cubeb*) user_ptr;
while (!atomic_load(&ctx->state_join)) {
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
cubeb_stream* stm = &ctx->streams[i];
update_state(stm);
pthread_mutex_lock(&ctx->state.mutex);
bool waiting = false;
while (!atomic_load(&ctx->state.join)) {
waiting |= atomic_load(&ctx->state.waiting);
if (waiting) {
atomic_store(&ctx->state.waiting, false);
waiting = false;
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
cubeb_stream* stm = &ctx->streams[i];
update_state(stm);
waiting |= waiting_state(atomic_load(&stm->state));
}
// state changed from another thread, update again immediately
if(atomic_load(&ctx->state.waiting)) {
waiting = true;
continue;
}
// Not waiting for any change anymore: we can wait on the
// condition variable without timeout
if (!waiting) {
continue;
}
// while any stream is waiting for state change we sleep with regular
// timeouts. But we wake up immediately if signaled.
// This might seem like a poor man's implementation of state change
// waiting but (as of december 2019), the implementation of
// AAudioStream_waitForStateChange is pretty much the same:
// https://android.googlesource.com/platform/frameworks/av/+/refs/heads/master/media/libaaudio/src/core/AudioStream.cpp#277
struct timespec timeout;
clock_gettime(CLOCK_MONOTONIC, &timeout);
timeout.tv_nsec += 5 * 1000 * 1000; // wait 5ms
pthread_cond_timedwait(&ctx->state.cond, &ctx->state.mutex, &timeout);
} else {
pthread_cond_wait(&ctx->state.cond, &ctx->state.mutex);
}
struct timespec ts = {
.tv_nsec = 1000 * 1000 * 5,
.tv_sec = 0,
};
nanosleep(&ts, NULL); // wait 5ms
}
// make sure other thread joins as well
pthread_cond_signal(&ctx->state.cond);
pthread_mutex_unlock(&ctx->state.mutex);
LOG("Exiting state thread");
return NULL;
}
@ -351,13 +435,34 @@ aaudio_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
static void
aaudio_destroy(cubeb * ctx)
{
atomic_store(&ctx->state_join, true);
int err = pthread_join(ctx->state_thread, NULL);
#ifndef NDEBUG
// make sure all streams were destroyed
for(unsigned i = 0u; i < MAX_STREAMS; ++i) {
assert(!atomic_load(&ctx->streams[i].in_use));
}
#endif
// broadcast joining to both threads
// they will additionally signal each other before joining
atomic_store(&ctx->state.join, true);
pthread_cond_broadcast(&ctx->state.cond);
int err;
err = pthread_join(ctx->state.thread, NULL);
if (err != 0) {
LOG("pthread_join: %s", strerror(err));
}
pthread_mutex_destroy(&ctx->mutex);
err = pthread_join(ctx->state.notifier, NULL);
if (err != 0) {
LOG("pthread_join: %s", strerror(err));
}
pthread_cond_destroy(&ctx->state.cond);
pthread_mutex_destroy(&ctx->state.mutex);
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
pthread_mutex_destroy(&ctx->streams[i].mutex);
}
if (ctx->libaaudio) {
dlclose(ctx->libaaudio);
@ -368,10 +473,12 @@ aaudio_destroy(cubeb * ctx)
/*static*/ int
aaudio_init(cubeb ** context, char const * context_name) {
(void) context_name;
int err;
// load api
void * libaaudio = NULL;
#ifndef DISABLE_LIBAAUDIO_DLOPEN
void * libaaudio = dlopen("libaaudio.so", RTLD_NOW);
libaaudio = dlopen("libaaudio.so", RTLD_NOW);
if (!libaaudio) {
return CUBEB_ERROR;
}
@ -392,10 +499,46 @@ aaudio_init(cubeb ** context, char const * context_name) {
cubeb* ctx = (cubeb*) calloc(1, sizeof(*ctx));
ctx->ops = &aaudio_ops;
ctx->libaaudio = libaaudio;
atomic_store(&ctx->state_join, false);
atomic_init(&ctx->state.join, false);
atomic_init(&ctx->state.waiting, false);
pthread_mutex_init(&ctx->state.mutex, NULL);
pthread_mutex_init(&ctx->mutex, NULL);
pthread_create(&ctx->state_thread, NULL, state_thread, ctx);
pthread_condattr_t cond_attr;
pthread_condattr_init(&cond_attr);
pthread_condattr_setclock(&cond_attr, CLOCK_MONOTONIC);
err = pthread_cond_init(&ctx->state.cond, &cond_attr);
if (err) {
LOG("pthread_cond_init: %s", strerror(err));
aaudio_destroy(ctx);
return CUBEB_ERROR;
}
// The stream mutexes are not bound to the lifetimes of the
// streams since we need them to synchronize the streams with
// the state thread.
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
atomic_init(&ctx->streams[i].in_use, false);
atomic_init(&ctx->streams[i].state, STREAM_STATE_INIT);
pthread_mutex_init(&ctx->streams[i].mutex, NULL);
}
err = pthread_create(&ctx->state.thread, NULL, state_thread, ctx);
if (err != 0) {
LOG("pthread_create: %s", strerror(err));
aaudio_destroy(ctx);
return CUBEB_ERROR;
}
// TODO: we could set the priority of the notifier thread lower than
// the priority of the state thread. This way, it's more likely
// that the state thread will be woken up by the condition variable signal
// when both are currently waiting
err = pthread_create(&ctx->state.notifier, NULL, notifier_thread, ctx);
if (err != 0) {
LOG("pthread_create: %s", strerror(err));
aaudio_destroy(ctx);
return CUBEB_ERROR;
}
*context = ctx;
return CUBEB_OK;
@ -487,6 +630,8 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
atomic_store(&stm->context->state.waiting, true);
pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@ -529,6 +674,8 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
return AAUDIO_CALLBACK_RESULT_STOP;
} else if (done_frames < num_frames) {
atomic_store(&stm->state, STREAM_STATE_DRAINING);
atomic_store(&stm->context->state.waiting, true);
pthread_cond_signal(&stm->context->state.cond);
char* begin = ((char*)audio_data) + done_frames * stm->out_frame_size;
memset(begin, 0x0, (num_frames - done_frames) * stm->out_frame_size);
@ -574,6 +721,8 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
// stop it from the state thread. That is signaled via the
// DRAINING state.
atomic_store(&stm->state, STREAM_STATE_DRAINING);
atomic_store(&stm->context->state.waiting, true);
pthread_cond_signal(&stm->context->state.cond);
}
return AAUDIO_CALLBACK_RESULT_CONTINUE;
@ -686,10 +835,9 @@ aaudio_stream_destroy(cubeb_stream * stm)
free(stm->in_buf);
}
atomic_store(&stm->in_use, false);
atomic_store(&stm->state, STREAM_STATE_INIT);
pthread_mutex_unlock(&stm->mutex);
pthread_mutex_destroy(&stm->mutex);
atomic_store(&stm->in_use, false);
}
static int
@ -718,20 +866,28 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
// find a free stream
// atomically find a free stream.
cubeb_stream * stm = NULL;
{
pthread_mutex_lock(&ctx->mutex);
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
if (!atomic_load(&ctx->streams[i].in_use)) {
stm = &ctx->streams[i];
break;
}
for (unsigned i = 0u; i < MAX_STREAMS; ++i) {
// This check is only an optimization, we don't strictly need it
// since we check again after locking the mutex.
if (atomic_load(&ctx->streams[i].in_use)) {
continue;
}
atomic_store(&stm->state, STREAM_STATE_INIT);
atomic_store(&stm->in_use, true);
pthread_mutex_unlock(&ctx->mutex);
// if this fails with EBUSY, another thread initialized this stream
// between our check of in_use and this.
int err = pthread_mutex_trylock(&ctx->streams[i].mutex);
if (err != 0 || atomic_load(&ctx->streams[i].in_use)) {
if (err && err != EBUSY) {
LOG("pthread_mutex_trylock: %s", strerror(err));
}
continue;
}
stm = &ctx->streams[i];
break;
}
if (!stm) {
@ -739,10 +895,10 @@ aaudio_stream_init(cubeb * ctx,
return CUBEB_ERROR;
}
unsigned res_err = CUBEB_ERROR;
pthread_mutex_init(&stm->mutex, NULL);
pthread_mutex_lock(&stm->mutex);
assert(atomic_load(&stm->state) == STREAM_STATE_INIT);
atomic_store(&stm->in_use, true);
unsigned res_err = CUBEB_ERROR;
stm->user_ptr = user_ptr;
stm->data_callback = data_callback;
stm->state_callback = state_callback;
@ -860,6 +1016,9 @@ aaudio_stream_init(cubeb * ctx,
goto error;
}
// the stream isn't started initially. We don't need to differntiate
// between a stream that was just initialized and one that played
// already but was stopped
atomic_store(&stm->state, STREAM_STATE_STOPPED);
LOG("Cubeb stream (%p) init success", (void*) stm);
pthread_mutex_unlock(&stm->mutex);
@ -948,7 +1107,8 @@ aaudio_stream_start(cubeb_stream * stm)
}
int ret = CUBEB_OK;
while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING)) {
bool success;
while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STARTING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@ -981,6 +1141,11 @@ aaudio_stream_start(cubeb_stream * stm)
break;
}
if(success) {
atomic_store(&stm->context->state.waiting, true);
pthread_cond_signal(&stm->context->state.cond);
}
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@ -1061,7 +1226,8 @@ aaudio_stream_stop(cubeb_stream * stm)
}
int ret = CUBEB_OK;
while (!atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING)) {
bool success;
while (!(success = atomic_compare_exchange_strong(&stm->state, &state, STREAM_STATE_STOPPING))) {
// we land here only if the state has changed in the meantime
switch (state) {
// If an error ocurred in the meantime, we can't change that.
@ -1095,6 +1261,11 @@ aaudio_stream_stop(cubeb_stream * stm)
break;
}
if(success) {
atomic_store(&stm->context->state.waiting, true);
pthread_cond_signal(&stm->context->state.cond);
}
pthread_mutex_unlock(&stm->mutex);
return ret;
}
@ -1178,6 +1349,8 @@ static const struct cubeb_ops aaudio_ops = {
.stream_stop = aaudio_stream_stop,
.stream_reset_default_device = NULL,
.stream_get_position = aaudio_stream_get_position,
// NOTE: this could be implemented via means comparable to the
// OpenSLES backend
.stream_get_latency = NULL,
.stream_set_volume = aaudio_stream_set_volume,
.stream_get_current_device = NULL,