wasapi: make render thread exist for the lifetime of the stream object

- Converts cubeb_stream into a reference-counted object, and gets rid of
  emergency_bailout logic
- When reconfiguring, only restart a stream if it was already started
- Removes timeout inside the render thread, as we can't guarantee events
  while a stream is stopped

Subsequently rebased by Paul Adenot <padenot@mozilla.com>
This commit is contained in:
Michael M 2023-05-02 12:06:28 +02:00 коммит произвёл Paul Adenot
Родитель 1ba9237364
Коммит febf49d089
1 изменённых файлов: 140 добавлений и 134 удалений

Просмотреть файл

@ -110,7 +110,9 @@ struct com_heap_ptr_deleter {
template <typename T>
using com_heap_ptr = std::unique_ptr<T, com_heap_ptr_deleter>;
template <typename T, size_t N> constexpr size_t ARRAY_LENGTH(T (&)[N])
template <typename T, size_t N>
constexpr size_t
ARRAY_LENGTH(T (&)[N])
{
return N;
}
@ -188,6 +190,20 @@ private:
T * ptr = nullptr;
};
LONG
wasapi_stream_add_ref(cubeb_stream * stm);
LONG
wasapi_stream_release(cubeb_stream * stm);
struct auto_stream_ref {
auto_stream_ref(cubeb_stream * stm_) : stm(stm_)
{
wasapi_stream_add_ref(stm);
}
~auto_stream_ref() { wasapi_stream_release(stm); }
cubeb_stream * stm;
};
extern cubeb_ops const wasapi_ops;
static com_heap_ptr<wchar_t>
@ -377,8 +393,8 @@ struct cubeb_stream {
com_ptr<IAudioClient> input_client;
/* Interface to use the event driven capture interface */
com_ptr<IAudioCaptureClient> capture_client;
/* This event is set by the stream_stop and stream_destroy
function, so the render loop can exit properly. */
/* This event is set by the stream_destroy function, so the render loop can
exit properly. */
HANDLE shutdown_event = 0;
/* Set by OnDefaultDeviceChanged when a stream reconfiguration is required.
The reconfiguration is handled by the render loop thread. */
@ -422,17 +438,20 @@ struct cubeb_stream {
float volume = 1.0;
/* True if the stream is draining. */
bool draining = false;
/* If the render thread fails to stop, this is set to true and ownership of
* the stm is "leaked" to the render thread for later cleanup. */
std::atomic<bool> emergency_bailout{false};
/* This needs an active audio input stream to be known, and is updated in the
* first audio input callback. */
std::atomic<int64_t> input_latency_hns{LATENCY_NOT_AVAILABLE_YET};
/* Those attributes count the number of frames requested (resp. received) by
the OS, to be able to detect drifts. This is only used for logging for now. */
size_t total_input_frames = 0;
size_t total_output_frames = 0;
/* This is set by the render loop thread once it has obtained a reference to
* COM and this stream object. */
HANDLE thread_ready_event = 0;
/* Keep a ref count on this stream object. After both stream_destroy has been
* called and the render loop thread has exited, destroy this stream object.
*/
LONG ref_count = 0;
};
class monitor_device_notifications {
@ -787,9 +806,6 @@ wasapi_data_callback(cubeb_stream * stm, void * user_ptr,
void const * input_buffer, void * output_buffer,
long nframes)
{
if (stm->emergency_bailout) {
return CUBEB_ERROR;
}
return stm->data_callback(stm, user_ptr, input_buffer, output_buffer,
nframes);
}
@ -797,9 +813,6 @@ wasapi_data_callback(cubeb_stream * stm, void * user_ptr,
void
wasapi_state_callback(cubeb_stream * stm, void * user_ptr, cubeb_state state)
{
if (stm->emergency_bailout) {
return;
}
return stm->state_callback(stm, user_ptr, state);
}
@ -1353,31 +1366,12 @@ refill_callback_output(cubeb_stream * stm)
void
wasapi_stream_destroy(cubeb_stream * stm);
static void
handle_emergency_bailout(cubeb_stream * stm)
{
if (stm->emergency_bailout) {
CloseHandle(stm->thread);
stm->thread = NULL;
CloseHandle(stm->shutdown_event);
stm->shutdown_event = 0;
wasapi_stream_destroy(stm);
_endthreadex(0);
}
}
static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
{
AutoRegisterThread raii("cubeb rendering thread");
cubeb_stream * stm = static_cast<cubeb_stream *>(stream);
bool is_playing = true;
HANDLE wait_array[4] = {stm->shutdown_event, stm->reconfigure_event,
stm->refill_event, stm->input_available_event};
HANDLE mmcss_handle = NULL;
HRESULT hr = 0;
DWORD mmcss_task_index = 0;
auto_stream_ref stream_ref(stm);
struct auto_com {
auto_com()
{
@ -1387,6 +1381,21 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
~auto_com() { CoUninitialize(); }
} com;
bool is_playing = true;
HANDLE wait_array[4] = {stm->shutdown_event, stm->reconfigure_event,
stm->refill_event, stm->input_available_event};
HANDLE mmcss_handle = NULL;
HRESULT hr = 0;
DWORD mmcss_task_index = 0;
// Signal wasapi_stream_start that we've initialized COM and incremented
// the stream's ref_count.
BOOL ok = SetEvent(stm->thread_ready_event);
if (!ok) {
LOG("thread_ready SetEvent failed: %lx", GetLastError());
return 0;
}
/* We could consider using "Pro Audio" here for WebAudio and
maybe WebRTC. */
mmcss_handle = AvSetMmThreadCharacteristicsA("Audio", &mmcss_task_index);
@ -1396,20 +1405,9 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
GetLastError());
}
/* WaitForMultipleObjects timeout can trigger in cases where we don't want to
treat it as a timeout, such as across a system sleep/wake cycle. Trigger
the timeout error handling only when the timeout_limit is reached, which is
reset on each successful loop. */
unsigned timeout_count = 0;
const unsigned timeout_limit = 3;
while (is_playing) {
handle_emergency_bailout(stm);
DWORD waitResult = WaitForMultipleObjects(ARRAY_LENGTH(wait_array),
wait_array, FALSE, 1000);
handle_emergency_bailout(stm);
if (waitResult != WAIT_TIMEOUT) {
timeout_count = 0;
}
wait_array, FALSE, INFINITE);
switch (waitResult) {
case WAIT_OBJECT_0: { /* shutdown */
is_playing = false;
@ -1424,12 +1422,13 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
XASSERT(stm->output_client || stm->input_client);
LOG("Reconfiguring the stream");
/* Close the stream */
bool was_running = false;
if (stm->output_client) {
stm->output_client->Stop();
was_running = stm->output_client->Stop() == S_OK;
LOG("Output stopped.");
}
if (stm->input_client) {
stm->input_client->Stop();
was_running = stm->input_client->Stop() == S_OK;
LOG("Input stopped.");
}
{
@ -1450,7 +1449,7 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
LOG("Stream setup successfuly.");
}
XASSERT(stm->output_client || stm->input_client);
if (stm->output_client) {
if (was_running && stm->output_client) {
hr = stm->output_client->Start();
if (FAILED(hr)) {
LOG("Error starting output after reconfigure, error: %lx", hr);
@ -1459,7 +1458,7 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
}
LOG("Output started after reconfigure.");
}
if (stm->input_client) {
if (was_running && stm->input_client) {
hr = stm->input_client->Start();
if (FAILED(hr)) {
LOG("Error starting input after reconfiguring, error: %lx", hr);
@ -1488,14 +1487,6 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
break;
}
case WAIT_TIMEOUT:
XASSERT(stm->shutdown_event == wait_array[0]);
if (++timeout_count >= timeout_limit) {
LOG("Render loop reached the timeout limit.");
is_playing = false;
hr = E_FAIL;
}
break;
default:
LOG("case %lu not handled in render loop.", waitResult);
XASSERT(false);
@ -1515,8 +1506,6 @@ static unsigned int __stdcall wasapi_stream_render_loop(LPVOID stream)
AvRevertMmThreadCharacteristics(mmcss_handle);
}
handle_emergency_bailout(stm);
if (FAILED(hr)) {
wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
}
@ -1772,24 +1761,16 @@ namespace {
enum ShutdownPhase { OnStop, OnDestroy };
bool
stop_and_join_render_thread(cubeb_stream * stm, ShutdownPhase phase)
stop_and_join_render_thread(cubeb_stream * stm)
{
// Only safe to transfer `stm` ownership to the render thread when
// the stream is being destroyed by the caller.
bool bailout = phase == OnDestroy;
LOG("%p: Stop and join render thread: %p (%d), phase=%d", stm, stm->thread,
stm->emergency_bailout.load(), static_cast<int>(phase));
LOG("%p: Stop and join render thread: %p", stm, stm->thread);
if (!stm->thread) {
return true;
}
XASSERT(!stm->emergency_bailout);
BOOL ok = SetEvent(stm->shutdown_event);
if (!ok) {
LOG("stop_and_join_render_thread: SetEvent failed: %lx", GetLastError());
stm->emergency_bailout = bailout;
return false;
}
@ -1807,19 +1788,9 @@ stop_and_join_render_thread(cubeb_stream * stm, ShutdownPhase phase)
LOG("stop_and_join_render_thread: WaitForSingleObject on thread failed: "
"%lx, %lx",
r, GetLastError());
stm->emergency_bailout = bailout;
return false;
}
// Only attempt to close and null out the thread and event if the
// WaitForSingleObject above succeeded.
LOG("stop_and_join_render_thread: Closing thread.");
CloseHandle(stm->thread);
stm->thread = NULL;
CloseHandle(stm->shutdown_event);
stm->shutdown_event = 0;
return true;
}
@ -2725,8 +2696,8 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream,
return CUBEB_ERROR_INVALID_FORMAT;
}
std::unique_ptr<cubeb_stream, decltype(&wasapi_stream_destroy)> stm(
new cubeb_stream(), wasapi_stream_destroy);
cubeb_stream * stm = new cubeb_stream();
auto_stream_ref stream_ref(stm);
stm->context = context;
stm->data_callback = data_callback;
@ -2798,12 +2769,24 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream,
return CUBEB_ERROR;
}
stm->shutdown_event = CreateEvent(NULL, 0, 0, NULL);
if (!stm->shutdown_event) {
LOG("Can't create the shutdown event, error: %lx", GetLastError());
return CUBEB_ERROR;
}
stm->thread_ready_event = CreateEvent(NULL, 0, 0, NULL);
if (!stm->thread_ready_event) {
LOG("Can't create the thread ready event, error: %lx", GetLastError());
return CUBEB_ERROR;
}
{
/* Locking here is not strictly necessary, because we don't have a
notification client that can reset the stream yet, but it lets us
assert that the lock is held in the function. */
auto_lock lock(stm->stream_reset_lock);
rv = setup_wasapi_stream(stm.get());
rv = setup_wasapi_stream(stm);
}
if (rv != CUBEB_OK) {
return rv;
@ -2818,7 +2801,7 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream,
!(output_stream_params->prefs &
CUBEB_STREAM_PREF_DISABLE_DEVICE_SWITCHING))) {
LOG("Follow the system default input or/and output devices");
HRESULT hr = register_notification_client(stm.get());
HRESULT hr = register_notification_client(stm);
if (FAILED(hr)) {
/* this is not fatal, we can still play audio, but we won't be able
to keep using the default audio endpoint if it changes. */
@ -2826,7 +2809,24 @@ wasapi_stream_init(cubeb * context, cubeb_stream ** stream,
}
}
*stream = stm.release();
cubeb_async_log_reset_threads();
stm->thread =
(HANDLE)_beginthreadex(NULL, 512 * 1024, wasapi_stream_render_loop, stm,
STACK_SIZE_PARAM_IS_A_RESERVATION, NULL);
if (stm->thread == NULL) {
LOG("could not create WASAPI render thread.");
return CUBEB_ERROR;
}
// Wait for the wasapi_stream_render_loop thread to signal that COM has been
// initialized and the stream's ref_count has been incremented.
hr = WaitForSingleObject(stm->thread_ready_event, INFINITE);
XASSERT(hr == WAIT_OBJECT_0);
CloseHandle(stm->thread_ready_event);
stm->thread_ready_event = 0;
wasapi_stream_add_ref(stm);
*stream = stm;
LOG("Stream init successful (%p)", *stream);
return CUBEB_OK;
@ -2866,32 +2866,59 @@ close_wasapi_stream(cubeb_stream * stm)
}
}
LONG
wasapi_stream_add_ref(cubeb_stream * stm)
{
XASSERT(stm);
LONG result = InterlockedIncrement(&stm->ref_count);
LOGV("Stream ref count incremented = %i (%p)", result, stm);
return result;
}
LONG
wasapi_stream_release(cubeb_stream * stm)
{
XASSERT(stm);
LONG result = InterlockedDecrement(&stm->ref_count);
LOGV("Stream ref count decremented = %i (%p)", result, stm);
if (result == 0) {
LOG("Stream ref count hit zero, destroying (%p)", stm);
if (stm->notification_client) {
unregister_notification_client(stm);
}
CloseHandle(stm->shutdown_event);
CloseHandle(stm->reconfigure_event);
CloseHandle(stm->refill_event);
CloseHandle(stm->input_available_event);
CloseHandle(stm->thread);
// The variables intialized in wasapi_stream_init,
// must be destroyed in wasapi_stream_release.
stm->linear_input_buffer.reset();
{
auto_lock lock(stm->stream_reset_lock);
close_wasapi_stream(stm);
}
delete stm;
}
return result;
}
void
wasapi_stream_destroy(cubeb_stream * stm)
{
XASSERT(stm);
LOG("Stream destroy (%p)", stm);
LOG("Stream destroy called, decrementing ref count (%p)", stm);
if (!stop_and_join_render_thread(stm, OnDestroy)) {
// Emergency bailout: render thread becomes responsible for calling
// wasapi_stream_destroy.
return;
}
if (stm->notification_client) {
unregister_notification_client(stm);
}
{
auto_lock lock(stm->stream_reset_lock);
close_wasapi_stream(stm);
}
CloseHandle(stm->reconfigure_event);
CloseHandle(stm->refill_event);
CloseHandle(stm->input_available_event);
delete stm;
stop_and_join_render_thread(stm);
wasapi_stream_release(stm);
}
enum StreamDirection { OUTPUT, INPUT };
@ -2899,6 +2926,7 @@ enum StreamDirection { OUTPUT, INPUT };
int
stream_start_one_side(cubeb_stream * stm, StreamDirection dir)
{
XASSERT(stm);
XASSERT((dir == OUTPUT && stm->output_client) ||
(dir == INPUT && stm->input_client));
@ -2942,7 +2970,7 @@ wasapi_stream_start(cubeb_stream * stm)
{
auto_lock lock(stm->stream_reset_lock);
XASSERT(stm && !stm->thread && !stm->shutdown_event);
XASSERT(stm);
XASSERT(stm->output_client || stm->input_client);
if (stm->output_client) {
@ -2959,24 +2987,7 @@ wasapi_stream_start(cubeb_stream * stm)
}
}
stm->shutdown_event = CreateEvent(NULL, 0, 0, NULL);
if (!stm->shutdown_event) {
LOG("Can't create the shutdown event, error: %lx", GetLastError());
return CUBEB_ERROR;
}
cubeb_async_log_reset_threads();
stm->thread =
(HANDLE)_beginthreadex(NULL, 512 * 1024, wasapi_stream_render_loop, stm,
STACK_SIZE_PARAM_IS_A_RESERVATION, NULL);
if (stm->thread == NULL) {
LOG("could not create WASAPI render thread.");
CloseHandle(stm->shutdown_event);
stm->shutdown_event = 0;
return CUBEB_ERROR;
}
wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED);
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STARTED);
return CUBEB_OK;
}
@ -3009,12 +3020,6 @@ wasapi_stream_stop(cubeb_stream * stm)
wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED);
}
if (!stop_and_join_render_thread(stm, OnStop)) {
// If we could not join the thread, put the stream in error.
wasapi_state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
return CUBEB_ERROR;
}
return CUBEB_OK;
}
@ -3147,8 +3152,9 @@ wstr_to_utf8(LPCWSTR str)
return ret;
}
static std::unique_ptr<wchar_t const []>
utf8_to_wstr(char const * str) {
static std::unique_ptr<wchar_t const[]>
utf8_to_wstr(char const * str)
{
int size = ::MultiByteToWideChar(CP_UTF8, 0, str, -1, nullptr, 0);
if (size <= 0) {
return nullptr;
@ -3159,8 +3165,8 @@ utf8_to_wstr(char const * str) {
return ret;
}
static com_ptr<IMMDevice> wasapi_get_device_node(
IMMDeviceEnumerator * enumerator, IMMDevice * dev)
static com_ptr<IMMDevice>
wasapi_get_device_node(IMMDeviceEnumerator * enumerator, IMMDevice * dev)
{
com_ptr<IMMDevice> ret;
com_ptr<IDeviceTopology> devtopo;