From 7f5589a431b166a3c37bb5d5e0614cf2b1bf6387 Mon Sep 17 00:00:00 2001 From: Paul Adenot Date: Tue, 29 Nov 2022 19:08:49 +0100 Subject: [PATCH] Allow dynamic enabling and disabling of the log, add tests for the logging system. Also clear out the memory used by the async logger when it's shut down. This relies on the fact that enabling and disabling logging is performed on the same thread, but I think this is a reasonnable thing to do. We could assert it though. --- CMakeLists.txt | 1 + src/cubeb.c | 2 + src/cubeb_log.cpp | 94 +++++++++++++++++------- test/test_logging.cpp | 161 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 231 insertions(+), 27 deletions(-) create mode 100644 test/test_logging.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 7ae58d4..e7de39f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -399,6 +399,7 @@ if(BUILD_TESTS) add_sanitizers(test_resampler) cubeb_add_test(duplex) + cubeb_add_test(logging) if (USE_WASAPI) cubeb_add_test(overload_callback) diff --git a/src/cubeb.c b/src/cubeb.c index ea21046..cf29e12 100644 --- a/src/cubeb.c +++ b/src/cubeb.c @@ -351,6 +351,8 @@ cubeb_destroy(cubeb * context) } context->ops->destroy(context); + + cubeb_set_log_callback(CUBEB_LOG_DISABLED, NULL); } int diff --git a/src/cubeb_log.cpp b/src/cubeb_log.cpp index 0d86518..fa48f90 100644 --- a/src/cubeb_log.cpp +++ b/src/cubeb_log.cpp @@ -65,47 +65,67 @@ public: void push(char const str[CUBEB_LOG_MESSAGE_MAX_SIZE]) { cubeb_log_message msg(str); - msg_queue.enqueue(msg); + msg_queue->enqueue(msg); } void run() { - std::thread([this]() { + assert(logging_thread.get_id() == std::thread::id()); + assert(msg_queue); + logging_thread = std::thread([this]() { CUBEB_REGISTER_THREAD("cubeb_log"); - while (true) { + while (!shutdown_thread) { cubeb_log_message msg; - while (msg_queue.dequeue(&msg, 1)) { + while (msg_queue->dequeue(&msg, 1)) { cubeb_log_internal_no_format(msg.get()); } -#ifdef _WIN32 - Sleep(CUBEB_LOG_BATCH_PRINT_INTERVAL_MS); -#else - timespec sleep_duration = sleep_for; - timespec remainder; - do { - if (nanosleep(&sleep_duration, &remainder) == 0 || errno != EINTR) { - break; - } - sleep_duration = remainder; - } while (remainder.tv_sec || remainder.tv_nsec); -#endif + std::this_thread::sleep_for( + std::chrono::milliseconds(CUBEB_LOG_BATCH_PRINT_INTERVAL_MS)); } CUBEB_UNREGISTER_THREAD(); - }).detach(); + }); } // Tell the underlying queue the producer thread has changed, so it does not // assert in debug. This should be called with the thread stopped. - void reset_producer_thread() { msg_queue.reset_thread_ids(); } + void reset_producer_thread() { msg_queue->reset_thread_ids(); } + void start() + { + msg_queue.reset( + new lock_free_queue(CUBEB_LOG_MESSAGE_QUEUE_DEPTH)); + shutdown_thread = false; + run(); + } + void stop() + { + shutdown_thread = true; + if (logging_thread.get_id() != std::thread::id()) { + logging_thread.join(); + logging_thread = std::thread(); + // This is OK, because at this point, we know the consumer has stopped + // consuming. + msg_queue->reset_thread_ids(); + purge_queue(); + msg_queue.reset(nullptr); + } + } + void purge_queue() + { + assert(logging_thread.get_id() == std::thread::id() && + "Only purge the async logger queue when the thread is stopped"); + if (!msg_queue) { + return; + } + cubeb_log_message msg; + while (msg_queue->dequeue(&msg, 1)) { /* nothing */ + } + } private: -#ifndef _WIN32 - const struct timespec sleep_for = { - CUBEB_LOG_BATCH_PRINT_INTERVAL_MS / 1000, - (CUBEB_LOG_BATCH_PRINT_INTERVAL_MS % 1000) * 1000 * 1000}; -#endif - cubeb_async_logger() : msg_queue(CUBEB_LOG_MESSAGE_QUEUE_DEPTH) { run(); } + cubeb_async_logger() {} /** This is quite a big data structure, but is only instantiated if the * asynchronous logger is used.*/ - lock_free_queue msg_queue; + std::unique_ptr> msg_queue; + std::atomic shutdown_thread = {false}; + std::thread logging_thread; }; void @@ -115,8 +135,8 @@ cubeb_log_internal(char const * file, uint32_t line, char const * fmt, ...) va_start(args, fmt); char msg[CUBEB_LOG_MESSAGE_MAX_SIZE]; vsnprintf(msg, CUBEB_LOG_MESSAGE_MAX_SIZE, fmt, args); - g_cubeb_log_callback.load()("%s:%d:%s", file, line, msg); va_end(args); + g_cubeb_log_callback.load()("%s:%d:%s", file, line, msg); } void @@ -148,11 +168,28 @@ cubeb_async_log_reset_threads(void) cubeb_async_logger::get().reset_producer_thread(); } +void +cubeb_noop_log_callback(char const * /* fmt */, ...) +{ +} + void cubeb_log_set(cubeb_log_level log_level, cubeb_log_callback log_callback) { g_cubeb_log_level = log_level; - g_cubeb_log_callback = log_callback; + // Once a callback has a been set, `g_cubeb_log_callback` is never set back to + // nullptr, to prevent a TOCTOU race between checking the pointer + if (log_callback && log_level != CUBEB_LOG_DISABLED) { + g_cubeb_log_callback = log_callback; + cubeb_async_logger::get().start(); + } else if (!log_callback || CUBEB_LOG_DISABLED) { + // This returns once the thread has joined. + cubeb_async_logger::get().stop(); + g_cubeb_log_callback = cubeb_noop_log_callback; + cubeb_async_logger::get().purge_queue(); + } else { + assert(false && "Incorrect parameters passed to cubeb_log_set"); + } } cubeb_log_level @@ -164,5 +201,8 @@ cubeb_log_get_level() cubeb_log_callback cubeb_log_get_callback() { + if (g_cubeb_log_callback == cubeb_noop_log_callback) { + return nullptr; + } return g_cubeb_log_callback; } diff --git a/test/test_logging.cpp b/test/test_logging.cpp new file mode 100644 index 0000000..fdbf8d4 --- /dev/null +++ b/test/test_logging.cpp @@ -0,0 +1,161 @@ +/* + * Copyright © 2016 Mozilla Foundation + * + * This program is made available under an ISC-style license. See the + * accompanying file LICENSE for details. + */ + +/* cubeb_logging test */ +#include "gtest/gtest.h" +#if !defined(_XOPEN_SOURCE) +#define _XOPEN_SOURCE 600 +#endif +#include "cubeb/cubeb.h" +#include +#include +#include +#include +#include + +#include "common.h" + +#define PRINT_LOGS_TO_STDERR 0 + +#define SAMPLE_FREQUENCY 48000 +#define STREAM_FORMAT CUBEB_SAMPLE_FLOAT32LE +#define OUTPUT_CHANNELS 2 +#define OUTPUT_LAYOUT CUBEB_LAYOUT_STEREO + +std::atomic log_statements_received = {0}; +std::atomic data_callback_call_count = {0}; + +void +test_logging_callback(char const * fmt, ...) +{ + log_statements_received++; +#if PRINT_LOGS_TO_STDERR == 1 + char buf[1024]; + va_list argslist; + va_start(argslist, fmt); + vsnprintf(buf, 1024, fmt, argslist); + fprintf(stderr, "%s\n", buf); + va_end(argslist); +#endif // PRINT_LOGS_TO_STDERR +} + +long +data_cb_load(cubeb_stream * stream, void * user, const void * inputbuffer, + void * outputbuffer, long nframes) +{ + data_callback_call_count++; + return nframes; +} + +void +state_cb(cubeb_stream * stream, void * /*user*/, cubeb_state state) +{ + if (stream == NULL) + return; + + switch (state) { + case CUBEB_STATE_STARTED: + fprintf(stderr, "stream started\n"); + break; + case CUBEB_STATE_STOPPED: + fprintf(stderr, "stream stopped\n"); + break; + case CUBEB_STATE_DRAINED: + fprintf(stderr, "stream drained\n"); + break; + default: + fprintf(stderr, "unknown stream state %d\n", state); + } + + return; +} + +// Waits for at least one audio callback to have occured. +void wait_for_audio_callback() { + uint32_t audio_callback_index = + data_callback_call_count.load(std::memory_order_acquire); + while (audio_callback_index == + data_callback_call_count.load(std::memory_order_acquire)) { + delay(10); + } +} + +TEST(cubeb, logging) +{ + cubeb * ctx; + cubeb_stream * stream; + cubeb_stream_params output_params; + int r; + uint32_t latency_frames = 0; + + cubeb_set_log_callback(CUBEB_LOG_NORMAL, test_logging_callback); + + r = common_init(&ctx, "Cubeb logging test"); + ASSERT_EQ(r, CUBEB_OK) << "Error initializing cubeb library"; + + std::unique_ptr cleanup_cubeb_at_exit( + ctx, cubeb_destroy); + + output_params.format = STREAM_FORMAT; + output_params.rate = SAMPLE_FREQUENCY; + output_params.channels = OUTPUT_CHANNELS; + output_params.layout = OUTPUT_LAYOUT; + output_params.prefs = CUBEB_STREAM_PREF_NONE; + + r = cubeb_get_min_latency(ctx, &output_params, &latency_frames); + ASSERT_EQ(r, CUBEB_OK) << "Could not get minimal latency"; + + r = cubeb_stream_init(ctx, &stream, "Cubeb logging", NULL, NULL, NULL, + &output_params, latency_frames, data_cb_load, state_cb, + NULL); + ASSERT_EQ(r, CUBEB_OK) << "Error initializing cubeb stream"; + + std::unique_ptr + cleanup_stream_at_exit(stream, cubeb_stream_destroy); + + ASSERT_NE(log_statements_received.load(std::memory_order_acquire), 0u); + + cubeb_set_log_callback(CUBEB_LOG_DISABLED, nullptr); + log_statements_received.store(0, std::memory_order_release); + + // This is synchronous and we'll receive log messages on all backends that we + // test + cubeb_stream_start(stream); + + ASSERT_EQ(log_statements_received.load(std::memory_order_acquire), 0u); + + cubeb_set_log_callback(CUBEB_LOG_VERBOSE, test_logging_callback); + + wait_for_audio_callback(); + + ASSERT_NE(log_statements_received.load(std::memory_order_acquire), 0u); + + bool log_callback_set = true; + uint32_t iterations = 100; + while (iterations--) { + wait_for_audio_callback(); + + if (!log_callback_set) { + ASSERT_EQ(log_statements_received.load(std::memory_order_acquire), 0u); + // Set a logging callback, start logging + cubeb_set_log_callback(CUBEB_LOG_VERBOSE, test_logging_callback); + log_callback_set = true; + } else { + // Disable the logging callback, stop logging. + ASSERT_NE(log_statements_received.load(std::memory_order_acquire), 0u); + cubeb_set_log_callback(CUBEB_LOG_DISABLED, nullptr); + log_statements_received.store(0, std::memory_order_release); + // Disabling logging should flush any log message -- wait a bit and check + // that this is true. + // delay(100); + ASSERT_EQ(log_statements_received.load(std::memory_order_acquire), 0u); + log_callback_set = false; + } + } + + cubeb_stream_stop(stream); +}