Bug 1656438 - Run all MockCubebStreams off the same fake audio thread to avoid unwanted drift. r=padenot

With this patch, there is a fake audio thread present on a MockCubeb context as
soon as one MockCubebStream is running under that context. When the last running
MockCubebStream is stopped, the fake audio thread is joined and unset.

This adds a tad bit of complexity but results in zero unwanted drift between
MockCubebStreams under the same MockCubeb context. This is essential for stable
CrossGraphTrack tests.

A side effect of this is that the drift factor of a MockCubebStream does not
affect the interval at which data is processed, but rather the amount of data
processed each interval.

This patch also allows us to process data with virtually no wait time between
iterations (as opposed to wall-time 10ms-waits), for (much) speedier tests.

Differential Revision: https://phabricator.services.mozilla.com/D89772
This commit is contained in:
Andreas Pehrson 2020-09-15 14:42:20 +00:00
Родитель c543f24c3a
Коммит 6bdac502bc
2 изменённых файлов: 129 добавлений и 85 удалений

Просмотреть файл

@ -116,6 +116,7 @@ cubeb_ops const mock_ops = {
/*.stream_device_destroy =*/NULL,
/*.stream_register_device_changed_callback =*/NULL,
/*.register_device_collection_changed =*/
cubeb_mock_register_device_collection_changed};
// Represents the fake cubeb_stream. The context instance is needed to
@ -151,27 +152,10 @@ class MockCubebStream {
}
}
~MockCubebStream() { assert(!mFakeAudioThread); }
~MockCubebStream() = default;
int Start() {
assert(!mFakeAudioThread);
mStreamStop = false;
mFakeAudioThread.reset(new std::thread(ThreadFunction_s, this));
assert(mFakeAudioThread);
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
mStateCallback(stream, mUserPtr, CUBEB_STATE_STARTED);
return CUBEB_OK;
}
int Stop() {
assert(mFakeAudioThread);
mStreamStop = true;
mFakeAudioThread->join();
mFakeAudioThread.reset();
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
mStateCallback(stream, mUserPtr, CUBEB_STATE_STOPPED);
return CUBEB_OK;
}
int Start();
int Stop();
cubeb_devid GetInputDeviceID() const { return mInputDeviceID; }
cubeb_devid GetOutputDeviceID() const { return mOutputDeviceID; }
@ -181,8 +165,6 @@ class MockCubebStream {
uint32_t InputFrequency() const { return mAudioGenerator.mFrequency; }
void SetDriftFactor(float aDriftFactor) { mDriftFactor = aDriftFactor; }
void GoFaster() { mFastMode = true; }
void DontGoFaster() { mFastMode = false; }
void ForceError() { mForceErrorState = true; }
MediaEventSource<uint32_t>& FramesProcessedEvent() {
@ -196,56 +178,41 @@ class MockCubebStream {
MediaEventSource<void>& ErrorForcedEvent() { return mErrorForcedEvent; }
private:
// Simulates the audio thread. The thread is created at Start and destroyed
// at Stop. At next StreamStart a new thread is created.
static void ThreadFunction_s(MockCubebStream* that) {
that->ThreadFunction();
}
void ThreadFunction() {
while (!mStreamStop) {
// The drift factor affects the callback interval, while the callback
// always contains 10ms of audio frames.
const uint32_t audioIntervalMs = 10;
const uint32_t nrFrames = static_cast<uint32_t>(
(mHasOutput ? mOutputParams.rate : mInputParams.rate) *
audioIntervalMs / 1000);
if (mInputParams.rate) {
mAudioGenerator.GenerateInterleaved(mInputBuffer, nrFrames);
}
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
long outframes =
mDataCallback(stream, mUserPtr, mHasInput ? mInputBuffer : nullptr,
mHasOutput ? mOutputBuffer : nullptr, nrFrames);
mAudioVerifier.AppendDataInterleaved(mOutputBuffer, outframes,
NUM_OF_CHANNELS);
if (mAudioVerifier.PreSilenceEnded()) {
mFramesProcessedEvent.Notify(outframes);
}
if (outframes < nrFrames) {
mStateCallback(stream, mUserPtr, CUBEB_STATE_DRAINED);
break;
}
if (mForceErrorState) {
mForceErrorState = false;
mStateCallback(stream, mUserPtr, CUBEB_STATE_ERROR);
mErrorForcedEvent.Notify();
break;
}
uint32_t sampleRate(mInputParams.rate ? mInputParams.rate
: mOutputParams.rate);
std::this_thread::sleep_for(std::chrono::microseconds(
mFastMode ? 0
: static_cast<int32_t>(NUM_OF_FRAMES * 1000 * 1000 /
mDriftFactor / sampleRate)));
void Process10Ms() {
if (mStreamStop) {
return;
}
const uint32_t nrFrames = static_cast<uint32_t>(
(mHasOutput ? mOutputParams.rate : mInputParams.rate) * mDriftFactor *
10 / PR_MSEC_PER_SEC);
if (mInputParams.rate) {
mAudioGenerator.GenerateInterleaved(mInputBuffer, nrFrames);
}
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
long outframes =
mDataCallback(stream, mUserPtr, mHasInput ? mInputBuffer : nullptr,
mHasOutput ? mOutputBuffer : nullptr, nrFrames);
mAudioVerifier.AppendDataInterleaved(mOutputBuffer, outframes,
NUM_OF_CHANNELS);
if (mAudioVerifier.PreSilenceEnded()) {
mFramesProcessedEvent.Notify(outframes);
}
if (outframes < nrFrames) {
mStateCallback(stream, mUserPtr, CUBEB_STATE_DRAINED);
mStreamStop = true;
return;
}
if (mForceErrorState) {
mForceErrorState = false;
mStateCallback(stream, mUserPtr, CUBEB_STATE_ERROR);
mErrorForcedEvent.Notify();
mStreamStop = true;
return;
}
mOutputVerificationEvent.Notify(MakeTuple(
mAudioVerifier.PreSilenceSamples(), mAudioVerifier.EstimatedFreq(),
mAudioVerifier.CountDiscontinuities()));
}
public:
@ -255,8 +222,6 @@ class MockCubebStream {
const bool mHasOutput;
private:
// Thread that simulates the audio thread.
std::unique_ptr<std::thread> mFakeAudioThread;
// Signal to the audio thread that stream is stopped.
std::atomic_bool mStreamStop{true};
// The audio buffer used on data callback.
@ -294,7 +259,7 @@ class MockCubebStream {
class MockCubeb {
public:
MockCubeb() : ops(&mock_ops) {}
~MockCubeb() = default;
~MockCubeb() { MOZ_ASSERT(!mFakeAudioThread); };
// Cubeb backend implementation
// This allows passing this class as a cubeb* instance.
cubeb* AsCubebContext() { return reinterpret_cast<cubeb*>(this); }
@ -465,15 +430,67 @@ class MockCubeb {
delete mockStream;
}
void GoFaster() { mFastMode = true; }
void DontGoFaster() { mFastMode = false; }
MediaEventSource<MockCubebStream*>& StreamInitEvent() {
return mStreamInitEvent;
}
MediaEventSource<void>& StreamDestroyEvent() { return mStreamDestroyEvent; }
// MockCubeb specific API
void StartStream(MockCubebStream* aStream) {
auto streams = mLiveStreams.Lock();
MOZ_ASSERT(!streams->Contains(aStream));
streams->AppendElement(aStream);
if (!mFakeAudioThread) {
mFakeAudioThread = WrapUnique(new std::thread(ThreadFunction_s, this));
}
}
void StopStream(MockCubebStream* aStream) {
UniquePtr<std::thread> audioThread;
{
auto streams = mLiveStreams.Lock();
MOZ_ASSERT(streams->Contains(aStream));
streams->RemoveElement(aStream);
MOZ_ASSERT(mFakeAudioThread);
if (streams->IsEmpty()) {
audioThread = std::move(mFakeAudioThread);
}
}
if (audioThread) {
audioThread->join();
}
}
// Simulates the audio thread. The thread is created at Start and destroyed
// at Stop. At next StreamStart a new thread is created.
static void ThreadFunction_s(MockCubeb* aContext) {
aContext->ThreadFunction();
}
void ThreadFunction() {
while (true) {
{
auto streams = mLiveStreams.Lock();
for (auto& stream : *streams) {
stream->Process10Ms();
}
if (streams->IsEmpty()) {
break;
}
}
std::this_thread::sleep_for(
std::chrono::microseconds(mFastMode ? 0 : 10 * PR_USEC_PER_MSEC));
}
}
private:
// This needs to have the exact same memory layout as a real cubeb backend.
// It's very important for this `ops` member to be the very first member of
// the class, and to not have any virtual members (to avoid having a vtable).
// the class, and to not have any virtual members (to avoid having a
// vtable).
const cubeb_ops* ops;
// The callback to call when the device list has been changed.
cubeb_device_collection_changed_callback
@ -484,18 +501,48 @@ class MockCubeb {
void* mInputDeviceCollectionChangeUserPtr = nullptr;
void* mOutputDeviceCollectionChangeUserPtr = nullptr;
void* mUserPtr = nullptr;
// Whether or not this backend supports device collection change notification
// via a system callback. If not, Gecko is expected to re-query the list every
// time.
// Whether or not this backend supports device collection change
// notification via a system callback. If not, Gecko is expected to re-query
// the list every time.
bool mSupportsDeviceCollectionChangedCallback = true;
// Our input and output devices.
nsTArray<cubeb_device_info> mInputDevices;
nsTArray<cubeb_device_info> mOutputDevices;
// The streams that are currently running.
DataMutex<nsTArray<MockCubebStream*>> mLiveStreams{"MockCubeb::mLiveStreams"};
// Thread that simulates the audio thread, shared across MockCubebStreams to
// avoid unintended drift. This is set together with mLiveStreams, under the
// mLiveStreams DataMutex.
UniquePtr<std::thread> mFakeAudioThread;
// Whether to run the fake audio thread in fast mode, not caring about wall
// clock time. false is default and means data is processed every 10ms. When
// true we sleep(0) between iterations instead of 10ms.
std::atomic<bool> mFastMode{false};
MediaEventProducer<MockCubebStream*> mStreamInitEvent;
MediaEventProducer<void> mStreamDestroyEvent;
};
int MockCubebStream::Start() {
mStreamStop = false;
reinterpret_cast<MockCubeb*>(context)->StartStream(this);
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
mStateCallback(stream, mUserPtr, CUBEB_STATE_STARTED);
return CUBEB_OK;
}
int MockCubebStream::Stop() {
mStreamStop = true;
mOutputVerificationEvent.Notify(MakeTuple(
mAudioVerifier.PreSilenceSamples(), mAudioVerifier.EstimatedFreq(),
mAudioVerifier.CountDiscontinuities()));
reinterpret_cast<MockCubeb*>(context)->StopStream(this);
cubeb_stream* stream = reinterpret_cast<cubeb_stream*>(this);
mStateCallback(stream, mUserPtr, CUBEB_STATE_STOPPED);
return CUBEB_OK;
}
void cubeb_mock_destroy(cubeb* context) {
delete reinterpret_cast<MockCubeb*>(context);
}

Просмотреть файл

@ -316,13 +316,13 @@ TEST(TestAudioTrackGraph, SourceTrack)
Unused << WaitFor(p);
// Wait for a second worth of audio data.
stream->GoFaster();
cubeb->GoFaster();
uint32_t totalFrames = 0;
WaitUntil(stream->FramesProcessedEvent(), [&](uint32_t aFrames) {
totalFrames += aFrames;
return totalFrames > static_cast<uint32_t>(graph->GraphRate());
});
stream->DontGoFaster();
cubeb->DontGoFaster();
// Clean up.
DispatchFunction([&] {
@ -424,17 +424,14 @@ void TestCrossGraphPort(uint32_t aInputRate, uint32_t aOutputRate,
partnerStream->SetDriftFactor(aDriftFactor);
inputStream->GoFaster();
partnerStream->GoFaster();
cubeb->GoFaster();
// Wait for 3s worth of audio data on the receiver stream.
uint32_t totalFrames = 0;
WaitUntil(partnerStream->FramesProcessedEvent(), [&](uint32_t aFrames) {
totalFrames += aFrames;
return totalFrames > static_cast<uint32_t>(partner->GraphRate() * 3);
});
inputStream->DontGoFaster();
partnerStream->DontGoFaster();
cubeb->DontGoFaster();
DispatchFunction([&] {
// Clean up on MainThread