gecko-dev/dom/media/MediaStreamGraph.cpp

3898 строки
138 KiB
C++

/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*-*/
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this file,
* You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "MediaStreamGraphImpl.h"
#include "mozilla/MathAlgorithms.h"
#include "mozilla/Unused.h"
#include "AudioSegment.h"
#include "VideoSegment.h"
#include "nsContentUtils.h"
#include "nsIObserver.h"
#include "nsPrintfCString.h"
#include "nsServiceManagerUtils.h"
#include "prerror.h"
#include "mozilla/Logging.h"
#include "mozilla/Attributes.h"
#include "TrackUnionStream.h"
#include "ImageContainer.h"
#include "AudioCaptureStream.h"
#include "AudioNodeStream.h"
#include "AudioNodeExternalInputStream.h"
#include "MediaStreamListener.h"
#include "MediaStreamVideoSink.h"
#include "mozilla/dom/BaseAudioContextBinding.h"
#include "mozilla/media/MediaUtils.h"
#include <algorithm>
#include "GeckoProfiler.h"
#include "VideoFrameContainer.h"
#include "mozilla/AbstractThread.h"
#include "mozilla/Unused.h"
#include "mtransport/runnable_utils.h"
#include "VideoUtils.h"
#include "GraphRunner.h"
#include "Tracing.h"
#include "webaudio/blink/DenormalDisabler.h"
#include "webaudio/blink/HRTFDatabaseLoader.h"
using namespace mozilla::layers;
using namespace mozilla::dom;
using namespace mozilla::gfx;
using namespace mozilla::media;
mozilla::AsyncLogger gMSGTraceLogger("MSGTracing");
namespace mozilla {
LazyLogModule gMediaStreamGraphLog("MediaStreamGraph");
#ifdef LOG
# undef LOG
#endif // LOG
#define LOG(type, msg) MOZ_LOG(gMediaStreamGraphLog, type, msg)
enum SourceMediaStream::TrackCommands : uint32_t {
TRACK_CREATE = TrackEventCommand::TRACK_EVENT_CREATED,
TRACK_END = TrackEventCommand::TRACK_EVENT_ENDED,
};
/**
* A hash table containing the graph instances, one per document.
*
* The key is a hash of nsPIDOMWindowInner, see `WindowToHash`.
*/
static nsDataHashtable<nsUint32HashKey, MediaStreamGraphImpl*> gGraphs;
MediaStreamGraphImpl::~MediaStreamGraphImpl() {
MOZ_ASSERT(mStreams.IsEmpty() && mSuspendedStreams.IsEmpty(),
"All streams should have been destroyed by messages from the main "
"thread");
LOG(LogLevel::Debug, ("MediaStreamGraph %p destroyed", this));
LOG(LogLevel::Debug, ("MediaStreamGraphImpl::~MediaStreamGraphImpl"));
#ifdef TRACING
gMSGTraceLogger.Stop();
#endif
}
void MediaStreamGraphImpl::AddStreamGraphThread(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
aStream->mTracksStartTime = mProcessedTime;
if (aStream->AsSourceStream()) {
SourceMediaStream* source = aStream->AsSourceStream();
TimeStamp currentTimeStamp = CurrentDriver()->GetCurrentTimeStamp();
TimeStamp processedTimeStamp =
currentTimeStamp + TimeDuration::FromSeconds(MediaTimeToSeconds(
mProcessedTime - IterationEnd()));
source->SetStreamTracksStartTimeStamp(processedTimeStamp);
}
if (aStream->IsSuspended()) {
mSuspendedStreams.AppendElement(aStream);
LOG(LogLevel::Debug,
("%p: Adding media stream %p, in the suspended stream array", this,
aStream));
} else {
mStreams.AppendElement(aStream);
LOG(LogLevel::Debug, ("%p: Adding media stream %p, count %zu", this,
aStream, mStreams.Length()));
}
SetStreamOrderDirty();
}
void MediaStreamGraphImpl::RemoveStreamGraphThread(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
// Remove references in mStreamUpdates before we allow aStream to die.
// Pending updates are not needed (since the main thread has already given
// up the stream) so we will just drop them.
{
MonitorAutoLock lock(mMonitor);
for (uint32_t i = 0; i < mStreamUpdates.Length(); ++i) {
if (mStreamUpdates[i].mStream == aStream) {
mStreamUpdates[i].mStream = nullptr;
}
}
}
// Ensure that mFirstCycleBreaker and mMixer are updated when necessary.
SetStreamOrderDirty();
if (aStream->IsSuspended()) {
mSuspendedStreams.RemoveElement(aStream);
} else {
mStreams.RemoveElement(aStream);
}
LOG(LogLevel::Debug, ("%p: Removed media stream %p, count %zu", this, aStream,
mStreams.Length()));
NS_RELEASE(aStream); // probably destroying it
}
StreamTime MediaStreamGraphImpl::GraphTimeToStreamTimeWithBlocking(
const MediaStream* aStream, GraphTime aTime) const {
MOZ_ASSERT(
aTime <= mStateComputedTime,
"Don't ask about times where we haven't made blocking decisions yet");
return std::max<StreamTime>(
0, std::min(aTime, aStream->mStartBlocking) - aStream->mTracksStartTime);
}
GraphTime MediaStreamGraphImpl::IterationEnd() const {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
return CurrentDriver()->IterationEnd();
}
void MediaStreamGraphImpl::UpdateCurrentTimeForStreams(
GraphTime aPrevCurrentTime) {
MOZ_ASSERT(OnGraphThread());
for (MediaStream* stream : AllStreams()) {
// Shouldn't have already notified of finish *and* have output!
MOZ_ASSERT_IF(stream->mStartBlocking > aPrevCurrentTime,
!stream->mNotifiedFinished);
// Calculate blocked time and fire Blocked/Unblocked events
GraphTime blockedTime = mStateComputedTime - stream->mStartBlocking;
NS_ASSERTION(blockedTime >= 0, "Error in blocking time");
stream->AdvanceTimeVaryingValuesToCurrentTime(mStateComputedTime,
blockedTime);
LOG(LogLevel::Verbose,
("%p: MediaStream %p bufferStartTime=%f blockedTime=%f", this, stream,
MediaTimeToSeconds(stream->mTracksStartTime),
MediaTimeToSeconds(blockedTime)));
stream->mStartBlocking = mStateComputedTime;
for (StreamTracks::TrackIter track(stream->mTracks); !track.IsEnded();
track.Next()) {
StreamTime streamCurrentTime =
stream->GraphTimeToStreamTime(mStateComputedTime);
if (track->IsEnded() && track->GetEnd() <= streamCurrentTime) {
if (!track->NotifiedEnded()) {
// Playout of this track ended and listeners have not been notified.
track->NotifyEnded();
for (const TrackBound<MediaStreamTrackListener>& listener :
stream->mTrackListeners) {
if (listener.mTrackID == track->GetID()) {
listener.mListener->NotifyOutput(this, track->GetEnd());
listener.mListener->NotifyEnded();
}
}
}
} else {
for (const TrackBound<MediaStreamTrackListener>& listener :
stream->mTrackListeners) {
if (listener.mTrackID == track->GetID()) {
listener.mListener->NotifyOutput(
this, streamCurrentTime - track->GetStart());
}
}
}
}
// The stream is fully finished when all of its track data has been played
// out.
if (stream->mFinished && !stream->mNotifiedFinished &&
mProcessedTime >= stream->StreamTimeToGraphTime(
stream->GetStreamTracks().GetLatestTrackEnd())) {
stream->mNotifiedFinished = true;
SetStreamOrderDirty();
}
}
}
template <typename C, typename Chunk>
void MediaStreamGraphImpl::ProcessChunkMetadataForInterval(MediaStream* aStream,
TrackID aTrackID,
C& aSegment,
StreamTime aStart,
StreamTime aEnd) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
MOZ_ASSERT(aStream);
MOZ_ASSERT(IsTrackIDExplicit(aTrackID));
StreamTime offset = 0;
for (typename C::ConstChunkIterator chunk(aSegment); !chunk.IsEnded();
chunk.Next()) {
if (offset >= aEnd) {
break;
}
offset += chunk->GetDuration();
if (chunk->IsNull() || offset < aStart) {
continue;
}
const PrincipalHandle& principalHandle = chunk->GetPrincipalHandle();
if (principalHandle != aSegment.GetLastPrincipalHandle()) {
aSegment.SetLastPrincipalHandle(principalHandle);
LOG(LogLevel::Debug,
("%p: MediaStream %p track %d, principalHandle "
"changed in %sChunk with duration %lld",
this, aStream, aTrackID,
aSegment.GetType() == MediaSegment::AUDIO ? "Audio" : "Video",
(long long)chunk->GetDuration()));
for (const TrackBound<MediaStreamTrackListener>& listener :
aStream->mTrackListeners) {
if (listener.mTrackID == aTrackID) {
listener.mListener->NotifyPrincipalHandleChanged(this,
principalHandle);
}
}
}
}
}
void MediaStreamGraphImpl::ProcessChunkMetadata(GraphTime aPrevCurrentTime) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
for (MediaStream* stream : AllStreams()) {
StreamTime iterationStart = stream->GraphTimeToStreamTime(aPrevCurrentTime);
StreamTime iterationEnd = stream->GraphTimeToStreamTime(mProcessedTime);
for (StreamTracks::TrackIter tracks(stream->mTracks); !tracks.IsEnded();
tracks.Next()) {
MediaSegment* segment = tracks->GetSegment();
if (!segment) {
continue;
}
if (tracks->GetType() == MediaSegment::AUDIO) {
AudioSegment* audio = static_cast<AudioSegment*>(segment);
ProcessChunkMetadataForInterval<AudioSegment, AudioChunk>(
stream, tracks->GetID(), *audio, iterationStart, iterationEnd);
} else if (tracks->GetType() == MediaSegment::VIDEO) {
VideoSegment* video = static_cast<VideoSegment*>(segment);
ProcessChunkMetadataForInterval<VideoSegment, VideoChunk>(
stream, tracks->GetID(), *video, iterationStart, iterationEnd);
} else {
MOZ_CRASH("Unknown track type");
}
}
}
}
GraphTime MediaStreamGraphImpl::WillUnderrun(MediaStream* aStream,
GraphTime aEndBlockingDecisions) {
// Finished streams can't underrun. ProcessedMediaStreams also can't cause
// underrun currently, since we'll always be able to produce data for them
// unless they block on some other stream.
if (aStream->mFinished || aStream->AsProcessedStream()) {
return aEndBlockingDecisions;
}
// This stream isn't finished or suspended. We don't need to call
// StreamTimeToGraphTime since an underrun is the only thing that can block
// it.
GraphTime bufferEnd = aStream->GetTracksEnd() + aStream->mTracksStartTime;
#ifdef DEBUG
if (bufferEnd < mProcessedTime) {
LOG(LogLevel::Error, ("%p: MediaStream %p underrun, "
"bufferEnd %f < mProcessedTime %f (%" PRId64
" < %" PRId64 "), Streamtime %" PRId64,
this, aStream, MediaTimeToSeconds(bufferEnd),
MediaTimeToSeconds(mProcessedTime), bufferEnd,
mProcessedTime, aStream->GetTracksEnd()));
aStream->DumpTrackInfo();
NS_ASSERTION(bufferEnd >= mProcessedTime, "Buffer underran");
}
#endif
return std::min(bufferEnd, aEndBlockingDecisions);
}
namespace {
// Value of mCycleMarker for unvisited streams in cycle detection.
const uint32_t NOT_VISITED = UINT32_MAX;
// Value of mCycleMarker for ordered streams in muted cycles.
const uint32_t IN_MUTED_CYCLE = 1;
} // namespace
bool MediaStreamGraphImpl::AudioTrackPresent() {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
bool audioTrackPresent = false;
for (MediaStream* stream : mStreams) {
if (stream->AsAudioNodeStream()) {
audioTrackPresent = true;
break;
}
for (StreamTracks::TrackIter it(stream->GetStreamTracks()); !it.IsEnded();
it.Next()) {
if (it->GetType() == MediaSegment::AUDIO && !it->NotifiedEnded()) {
audioTrackPresent = true;
break;
}
}
if (audioTrackPresent) {
break;
}
if (SourceMediaStream* source = stream->AsSourceStream()) {
if (source->HasPendingAudioTrack()) {
audioTrackPresent = true;
}
}
if (audioTrackPresent) {
break;
}
}
// XXX For some reason, there are race conditions when starting an audio input
// where we find no active audio tracks. In any case, if we have an active
// audio input we should not allow a switch back to a SystemClockDriver
if (!audioTrackPresent && mInputDeviceUsers.Count() != 0) {
NS_WARNING("No audio tracks, but full-duplex audio is enabled!!!!!");
audioTrackPresent = true;
}
return audioTrackPresent;
}
void MediaStreamGraphImpl::UpdateStreamOrder() {
MOZ_ASSERT(OnGraphThread());
bool audioTrackPresent = AudioTrackPresent();
// Note that this looks for any audio streams, input or output, and switches
// to a SystemClockDriver if there are none. However, if another is already
// pending, let that switch happen.
if (!audioTrackPresent && mRealtime &&
CurrentDriver()->AsAudioCallbackDriver()) {
MonitorAutoLock mon(mMonitor);
if (CurrentDriver()->AsAudioCallbackDriver()->IsStarted() &&
!(CurrentDriver()->Switching())) {
if (LifecycleStateRef() == LIFECYCLE_RUNNING) {
SystemClockDriver* driver = new SystemClockDriver(this);
CurrentDriver()->SwitchAtNextIteration(driver);
}
}
}
bool switching = false;
{
MonitorAutoLock mon(mMonitor);
switching = CurrentDriver()->Switching();
}
if (audioTrackPresent && mRealtime &&
!CurrentDriver()->AsAudioCallbackDriver() && !switching) {
MonitorAutoLock mon(mMonitor);
if (LifecycleStateRef() == LIFECYCLE_RUNNING) {
AudioCallbackDriver* driver =
new AudioCallbackDriver(this, AudioInputChannelCount());
CurrentDriver()->SwitchAtNextIteration(driver);
}
}
if (!mStreamOrderDirty) {
return;
}
mStreamOrderDirty = false;
// The algorithm for finding cycles is based on Tim Leslie's iterative
// implementation [1][2] of Pearce's variant [3] of Tarjan's strongly
// connected components (SCC) algorithm. There are variations (a) to
// distinguish whether streams in SCCs of size 1 are in a cycle and (b) to
// re-run the algorithm over SCCs with breaks at DelayNodes.
//
// [1] http://www.timl.id.au/?p=327
// [2]
// https://github.com/scipy/scipy/blob/e2c502fca/scipy/sparse/csgraph/_traversal.pyx#L582
// [3] http://citeseerx.ist.psu.edu/viewdoc/summary?doi=10.1.1.102.1707
//
// There are two stacks. One for the depth-first search (DFS),
mozilla::LinkedList<MediaStream> dfsStack;
// and another for streams popped from the DFS stack, but still being
// considered as part of SCCs involving streams on the stack.
mozilla::LinkedList<MediaStream> sccStack;
// An index into mStreams for the next stream found with no unsatisfied
// upstream dependencies.
uint32_t orderedStreamCount = 0;
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
MediaStream* s = mStreams[i];
ProcessedMediaStream* ps = s->AsProcessedStream();
if (ps) {
// The dfsStack initially contains a list of all processed streams in
// unchanged order.
dfsStack.insertBack(s);
ps->mCycleMarker = NOT_VISITED;
} else {
// SourceMediaStreams have no inputs and so can be ordered now.
mStreams[orderedStreamCount] = s;
++orderedStreamCount;
}
}
// mNextStackMarker corresponds to "index" in Tarjan's algorithm. It is a
// counter to label mCycleMarker on the next visited stream in the DFS
// uniquely in the set of visited streams that are still being considered.
//
// In this implementation, the counter descends so that the values are
// strictly greater than the values that mCycleMarker takes when the stream
// has been ordered (0 or IN_MUTED_CYCLE).
//
// Each new stream labelled, as the DFS searches upstream, receives a value
// less than those used for all other streams being considered.
uint32_t nextStackMarker = NOT_VISITED - 1;
// Reset list of DelayNodes in cycles stored at the tail of mStreams.
mFirstCycleBreaker = mStreams.Length();
// Rearrange dfsStack order as required to DFS upstream and pop streams
// in processing order to place in mStreams.
while (auto ps = static_cast<ProcessedMediaStream*>(dfsStack.getFirst())) {
const auto& inputs = ps->mInputs;
MOZ_ASSERT(ps->AsProcessedStream());
if (ps->mCycleMarker == NOT_VISITED) {
// Record the position on the visited stack, so that any searches
// finding this stream again know how much of the stack is in the cycle.
ps->mCycleMarker = nextStackMarker;
--nextStackMarker;
// Not-visited input streams should be processed first.
// SourceMediaStreams have already been ordered.
for (uint32_t i = inputs.Length(); i--;) {
if (inputs[i]->mSource->IsSuspended()) {
continue;
}
auto input = inputs[i]->mSource->AsProcessedStream();
if (input && input->mCycleMarker == NOT_VISITED) {
// It can be that this stream has an input which is from a suspended
// AudioContext.
if (input->isInList()) {
input->remove();
dfsStack.insertFront(input);
}
}
}
continue;
}
// Returning from DFS. Pop from dfsStack.
ps->remove();
// cycleStackMarker keeps track of the highest marker value on any
// upstream stream, if any, found receiving input, directly or indirectly,
// from the visited stack (and so from |ps|, making a cycle). In a
// variation from Tarjan's SCC algorithm, this does not include |ps|
// unless it is part of the cycle.
uint32_t cycleStackMarker = 0;
for (uint32_t i = inputs.Length(); i--;) {
if (inputs[i]->mSource->IsSuspended()) {
continue;
}
auto input = inputs[i]->mSource->AsProcessedStream();
if (input) {
cycleStackMarker = std::max(cycleStackMarker, input->mCycleMarker);
}
}
if (cycleStackMarker <= IN_MUTED_CYCLE) {
// All inputs have been ordered and their stack markers have been removed.
// This stream is not part of a cycle. It can be processed next.
ps->mCycleMarker = 0;
mStreams[orderedStreamCount] = ps;
++orderedStreamCount;
continue;
}
// A cycle has been found. Record this stream for ordering when all
// streams in this SCC have been popped from the DFS stack.
sccStack.insertFront(ps);
if (cycleStackMarker > ps->mCycleMarker) {
// Cycles have been found that involve streams that remain on the stack.
// Leave mCycleMarker indicating the most downstream (last) stream on
// the stack known to be part of this SCC. In this way, any searches on
// other paths that find |ps| will know (without having to traverse from
// this stream again) that they are part of this SCC (i.e. part of an
// intersecting cycle).
ps->mCycleMarker = cycleStackMarker;
continue;
}
// |ps| is the root of an SCC involving no other streams on dfsStack, the
// complete SCC has been recorded, and streams in this SCC are part of at
// least one cycle.
MOZ_ASSERT(cycleStackMarker == ps->mCycleMarker);
// If there are DelayNodes in this SCC, then they may break the cycles.
bool haveDelayNode = false;
auto next = sccStack.getFirst();
// Streams in this SCC are identified by mCycleMarker <= cycleStackMarker.
// (There may be other streams later in sccStack from other incompletely
// searched SCCs, involving streams still on dfsStack.)
//
// DelayNodes in cycles must behave differently from those not in cycles,
// so all DelayNodes in the SCC must be identified.
while (next && static_cast<ProcessedMediaStream*>(next)->mCycleMarker <=
cycleStackMarker) {
auto ns = next->AsAudioNodeStream();
// Get next before perhaps removing from list below.
next = next->getNext();
if (ns && ns->Engine()->AsDelayNodeEngine()) {
haveDelayNode = true;
// DelayNodes break cycles by producing their output in a
// preprocessing phase; they do not need to be ordered before their
// consumers. Order them at the tail of mStreams so that they can be
// handled specially. Do so now, so that DFS ignores them.
ns->remove();
ns->mCycleMarker = 0;
--mFirstCycleBreaker;
mStreams[mFirstCycleBreaker] = ns;
}
}
auto after_scc = next;
while ((next = sccStack.getFirst()) != after_scc) {
next->remove();
auto removed = static_cast<ProcessedMediaStream*>(next);
if (haveDelayNode) {
// Return streams to the DFS stack again (to order and detect cycles
// without delayNodes). Any of these streams that are still inputs
// for streams on the visited stack must be returned to the front of
// the stack to be ordered before their dependents. We know that none
// of these streams need input from streams on the visited stack, so
// they can all be searched and ordered before the current stack head
// is popped.
removed->mCycleMarker = NOT_VISITED;
dfsStack.insertFront(removed);
} else {
// Streams in cycles without any DelayNodes must be muted, and so do
// not need input and can be ordered now. They must be ordered before
// their consumers so that their muted output is available.
removed->mCycleMarker = IN_MUTED_CYCLE;
mStreams[orderedStreamCount] = removed;
++orderedStreamCount;
}
}
}
MOZ_ASSERT(orderedStreamCount == mFirstCycleBreaker);
}
void MediaStreamGraphImpl::CreateOrDestroyAudioStreams(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThread());
MOZ_ASSERT(mRealtime,
"Should only attempt to create audio streams in real-time mode");
if (aStream->mAudioOutputs.IsEmpty()) {
aStream->mAudioOutputStreams.Clear();
return;
}
if (!aStream->GetStreamTracks().GetAndResetTracksDirty() &&
!aStream->mAudioOutputStreams.IsEmpty()) {
return;
}
LOG(LogLevel::Debug,
("%p: Updating AudioOutputStreams for MediaStream %p", this, aStream));
AutoTArray<bool, 2> audioOutputStreamsFound;
for (uint32_t i = 0; i < aStream->mAudioOutputStreams.Length(); ++i) {
audioOutputStreamsFound.AppendElement(false);
}
for (StreamTracks::TrackIter tracks(aStream->GetStreamTracks(),
MediaSegment::AUDIO);
!tracks.IsEnded(); tracks.Next()) {
uint32_t i;
for (i = 0; i < audioOutputStreamsFound.Length(); ++i) {
if (aStream->mAudioOutputStreams[i].mTrackID == tracks->GetID()) {
break;
}
}
if (i < audioOutputStreamsFound.Length()) {
audioOutputStreamsFound[i] = true;
} else {
MediaStream::AudioOutputStream* audioOutputStream =
aStream->mAudioOutputStreams.AppendElement();
audioOutputStream->mAudioPlaybackStartTime = mProcessedTime;
audioOutputStream->mBlockedAudioTime = 0;
audioOutputStream->mLastTickWritten = 0;
audioOutputStream->mTrackID = tracks->GetID();
bool switching = false;
{
MonitorAutoLock lock(mMonitor);
switching = CurrentDriver()->Switching();
}
if (!CurrentDriver()->AsAudioCallbackDriver() && !switching) {
MonitorAutoLock mon(mMonitor);
if (LifecycleStateRef() == LIFECYCLE_RUNNING) {
AudioCallbackDriver* driver =
new AudioCallbackDriver(this, AudioInputChannelCount());
CurrentDriver()->SwitchAtNextIteration(driver);
}
}
}
}
for (int32_t i = audioOutputStreamsFound.Length() - 1; i >= 0; --i) {
if (!audioOutputStreamsFound[i]) {
aStream->mAudioOutputStreams.RemoveElementAt(i);
}
}
}
StreamTime MediaStreamGraphImpl::PlayAudio(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThread());
MOZ_ASSERT(mRealtime, "Should only attempt to play audio in realtime mode");
float volume = 0.0f;
for (uint32_t i = 0; i < aStream->mAudioOutputs.Length(); ++i) {
volume += aStream->mAudioOutputs[i].mVolume * mGlobalVolume;
}
StreamTime ticksWritten = 0;
for (uint32_t i = 0; i < aStream->mAudioOutputStreams.Length(); ++i) {
ticksWritten = 0;
MediaStream::AudioOutputStream& audioOutput =
aStream->mAudioOutputStreams[i];
StreamTracks::Track* track =
aStream->mTracks.FindTrack(audioOutput.mTrackID);
AudioSegment* audio = track->Get<AudioSegment>();
AudioSegment output;
StreamTime offset = aStream->GraphTimeToStreamTime(mProcessedTime);
// We don't update aStream->mTracksStartTime here to account for time spent
// blocked. Instead, we'll update it in UpdateCurrentTimeForStreams after
// the blocked period has completed. But we do need to make sure we play
// from the right offsets in the stream buffer, even if we've already
// written silence for some amount of blocked time after the current time.
GraphTime t = mProcessedTime;
while (t < mStateComputedTime) {
bool blocked = t >= aStream->mStartBlocking;
GraphTime end = blocked ? mStateComputedTime : aStream->mStartBlocking;
NS_ASSERTION(end <= mStateComputedTime, "mStartBlocking is wrong!");
// Check how many ticks of sound we can provide if we are blocked some
// time in the middle of this cycle.
StreamTime toWrite = end - t;
if (blocked) {
output.InsertNullDataAtStart(toWrite);
ticksWritten += toWrite;
LOG(LogLevel::Verbose,
("%p: MediaStream %p writing %" PRId64
" blocking-silence samples for "
"%f to %f (%" PRId64 " to %" PRId64 ")",
this, aStream, toWrite, MediaTimeToSeconds(t),
MediaTimeToSeconds(end), offset, offset + toWrite));
} else {
StreamTime endTicksNeeded = offset + toWrite;
StreamTime endTicksAvailable = audio->GetDuration();
if (endTicksNeeded <= endTicksAvailable) {
LOG(LogLevel::Verbose,
("%p: MediaStream %p writing %" PRId64 " samples for %f to %f "
"(samples %" PRId64 " to %" PRId64 ")",
this, aStream, toWrite, MediaTimeToSeconds(t),
MediaTimeToSeconds(end), offset, endTicksNeeded));
output.AppendSlice(*audio, offset, endTicksNeeded);
ticksWritten += toWrite;
offset = endTicksNeeded;
} else {
// MOZ_ASSERT(track->IsEnded(), "Not enough data, and track not
// ended."); If we are at the end of the track, maybe write the
// remaining samples, and pad with/output silence.
if (endTicksNeeded > endTicksAvailable &&
offset < endTicksAvailable) {
output.AppendSlice(*audio, offset, endTicksAvailable);
LOG(LogLevel::Verbose,
("%p: MediaStream %p writing %" PRId64 " samples for %f to %f "
"(samples %" PRId64 " to %" PRId64 ")",
this, aStream, toWrite, MediaTimeToSeconds(t),
MediaTimeToSeconds(end), offset, endTicksNeeded));
uint32_t available = endTicksAvailable - offset;
ticksWritten += available;
toWrite -= available;
offset = endTicksAvailable;
}
output.AppendNullData(toWrite);
LOG(LogLevel::Verbose,
("%p MediaStream %p writing %" PRId64
" padding slsamples for %f to "
"%f (samples %" PRId64 " to %" PRId64 ")",
this, aStream, toWrite, MediaTimeToSeconds(t),
MediaTimeToSeconds(end), offset, endTicksNeeded));
ticksWritten += toWrite;
}
output.ApplyVolume(volume);
}
t = end;
}
audioOutput.mLastTickWritten = offset;
output.WriteTo(mMixer, AudioOutputChannelCount(), mSampleRate);
}
return ticksWritten;
}
void MediaStreamGraphImpl::OpenAudioInputImpl(CubebUtils::AudioDeviceID aID,
AudioDataListener* aListener) {
MOZ_ASSERT(OnGraphThread());
// Only allow one device per MSG (hence, per document), but allow opening a
// device multiple times
nsTArray<RefPtr<AudioDataListener>>& listeners =
mInputDeviceUsers.GetOrInsert(aID);
if (listeners.IsEmpty() && mInputDeviceUsers.Count() > 1) {
// We don't support opening multiple input device in a graph for now.
listeners.RemoveElement(aID);
return;
}
MOZ_ASSERT(!listeners.Contains(aListener), "Don't add a listener twice.");
listeners.AppendElement(aListener);
if (listeners.Length() == 1) { // first open for this device
mInputDeviceID = aID;
// Switch Drivers since we're adding input (to input-only or full-duplex)
MonitorAutoLock mon(mMonitor);
if (LifecycleStateRef() == LIFECYCLE_RUNNING) {
AudioCallbackDriver* driver =
new AudioCallbackDriver(this, AudioInputChannelCount());
LOG(LogLevel::Debug,
("%p OpenAudioInput: starting new AudioCallbackDriver(input) %p",
this, driver));
CurrentDriver()->SwitchAtNextIteration(driver);
} else {
LOG(LogLevel::Error, ("OpenAudioInput in shutdown!"));
MOZ_ASSERT_UNREACHABLE("Can't open cubeb inputs in shutdown");
}
}
}
nsresult MediaStreamGraphImpl::OpenAudioInput(CubebUtils::AudioDeviceID aID,
AudioDataListener* aListener) {
// So, so, so annoying. Can't AppendMessage except on Mainthread
if (!NS_IsMainThread()) {
RefPtr<nsIRunnable> runnable =
WrapRunnable(this, &MediaStreamGraphImpl::OpenAudioInput, aID,
RefPtr<AudioDataListener>(aListener));
mAbstractMainThread->Dispatch(runnable.forget());
return NS_OK;
}
class Message : public ControlMessage {
public:
Message(MediaStreamGraphImpl* aGraph, CubebUtils::AudioDeviceID aID,
AudioDataListener* aListener)
: ControlMessage(nullptr),
mGraph(aGraph),
mID(aID),
mListener(aListener) {}
void Run() override { mGraph->OpenAudioInputImpl(mID, mListener); }
MediaStreamGraphImpl* mGraph;
CubebUtils::AudioDeviceID mID;
RefPtr<AudioDataListener> mListener;
};
// XXX Check not destroyed!
this->AppendMessage(MakeUnique<Message>(this, aID, aListener));
return NS_OK;
}
void MediaStreamGraphImpl::CloseAudioInputImpl(
Maybe<CubebUtils::AudioDeviceID>& aID, AudioDataListener* aListener) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
// It is possible to not know the ID here, find it first.
if (aID.isNothing()) {
for (auto iter = mInputDeviceUsers.Iter(); !iter.Done(); iter.Next()) {
if (iter.Data().Contains(aListener)) {
aID = Some(iter.Key());
}
}
MOZ_ASSERT(aID.isSome(), "Closing an audio input that was not opened.");
}
nsTArray<RefPtr<AudioDataListener>>* listeners =
mInputDeviceUsers.GetValue(aID.value());
MOZ_ASSERT(listeners);
DebugOnly<bool> wasPresent = listeners->RemoveElement(aListener);
MOZ_ASSERT(wasPresent);
// Breaks the cycle between the MSG and the listener.
aListener->Disconnect(this);
if (!listeners->IsEmpty()) {
// There is still a consumer for this audio input device
return;
}
mInputDeviceID = nullptr; // reset to default
mInputDeviceUsers.Remove(aID.value());
// Switch Drivers since we're adding or removing an input (to nothing/system
// or output only)
bool audioTrackPresent = AudioTrackPresent();
MonitorAutoLock mon(mMonitor);
if (LifecycleStateRef() == LIFECYCLE_RUNNING) {
GraphDriver* driver;
if (audioTrackPresent) {
// We still have audio output
LOG(LogLevel::Debug,
("%p: CloseInput: output present (AudioCallback)", this));
driver = new AudioCallbackDriver(this, AudioInputChannelCount());
CurrentDriver()->SwitchAtNextIteration(driver);
} else if (CurrentDriver()->AsAudioCallbackDriver()) {
LOG(LogLevel::Debug,
("%p: CloseInput: no output present (SystemClockCallback)", this));
driver = new SystemClockDriver(this);
CurrentDriver()->SwitchAtNextIteration(driver);
} // else SystemClockDriver->SystemClockDriver, no switch
}
}
void MediaStreamGraphImpl::CloseAudioInput(
Maybe<CubebUtils::AudioDeviceID>& aID, AudioDataListener* aListener) {
// So, so, so annoying. Can't AppendMessage except on Mainthread
if (!NS_IsMainThread()) {
RefPtr<nsIRunnable> runnable =
WrapRunnable(this, &MediaStreamGraphImpl::CloseAudioInput, aID,
RefPtr<AudioDataListener>(aListener));
mAbstractMainThread->Dispatch(runnable.forget());
return;
}
class Message : public ControlMessage {
public:
Message(MediaStreamGraphImpl* aGraph, Maybe<CubebUtils::AudioDeviceID>& aID,
AudioDataListener* aListener)
: ControlMessage(nullptr),
mGraph(aGraph),
mID(aID),
mListener(aListener) {}
void Run() override { mGraph->CloseAudioInputImpl(mID, mListener); }
MediaStreamGraphImpl* mGraph;
Maybe<CubebUtils::AudioDeviceID> mID;
RefPtr<AudioDataListener> mListener;
};
this->AppendMessage(MakeUnique<Message>(this, aID, aListener));
}
// All AudioInput listeners get the same speaker data (at least for now).
void MediaStreamGraphImpl::NotifyOutputData(AudioDataValue* aBuffer,
size_t aFrames, TrackRate aRate,
uint32_t aChannels) {
#ifdef ANDROID
// On Android, mInputDeviceID is always null and represents the default
// device.
// The absence of an input consumer is enough to know we need to bail out
// here.
if (!mInputDeviceUsers.GetValue(mInputDeviceID)) {
return;
}
#else
if (!mInputDeviceID) {
return;
}
#endif
// When/if we decide to support multiple input devices per graph, this needs
// to loop over them.
nsTArray<RefPtr<AudioDataListener>>* listeners =
mInputDeviceUsers.GetValue(mInputDeviceID);
MOZ_ASSERT(listeners);
for (auto& listener : *listeners) {
listener->NotifyOutputData(this, aBuffer, aFrames, aRate, aChannels);
}
}
void MediaStreamGraphImpl::NotifyInputData(const AudioDataValue* aBuffer,
size_t aFrames, TrackRate aRate,
uint32_t aChannels) {
#ifdef ANDROID
if (!mInputDeviceUsers.GetValue(mInputDeviceID)) {
return;
}
#else
# ifdef DEBUG
{
MonitorAutoLock lock(mMonitor);
// Either we have an audio input device, or we just removed the audio input
// this iteration, and we're switching back to an output-only driver next
// iteration.
MOZ_ASSERT(mInputDeviceID || CurrentDriver()->Switching());
}
# endif
if (!mInputDeviceID) {
return;
}
#endif
nsTArray<RefPtr<AudioDataListener>>* listeners =
mInputDeviceUsers.GetValue(mInputDeviceID);
MOZ_ASSERT(listeners);
for (auto& listener : *listeners) {
listener->NotifyInputData(this, aBuffer, aFrames, aRate, aChannels);
}
}
void MediaStreamGraphImpl::DeviceChangedImpl() {
MOZ_ASSERT(OnGraphThread());
#ifdef ANDROID
if (!mInputDeviceUsers.GetValue(mInputDeviceID)) {
return;
}
#else
if (!mInputDeviceID) {
return;
}
#endif
nsTArray<RefPtr<AudioDataListener>>* listeners =
mInputDeviceUsers.GetValue(mInputDeviceID);
for (auto& listener : *listeners) {
listener->DeviceChanged(this);
}
}
void MediaStreamGraphImpl::DeviceChanged() {
// This is safe to be called from any thread: this message comes from an
// underlying platform API, and we don't have much guarantees. If it is not
// called from the main thread (and it probably will rarely be), it will post
// itself to the main thread, and the actual device change message will be ran
// and acted upon on the graph thread.
if (!NS_IsMainThread()) {
RefPtr<nsIRunnable> runnable =
WrapRunnable(this, &MediaStreamGraphImpl::DeviceChanged);
mAbstractMainThread->Dispatch(runnable.forget());
return;
}
class Message : public ControlMessage {
public:
explicit Message(MediaStreamGraph* aGraph)
: ControlMessage(nullptr),
mGraphImpl(static_cast<MediaStreamGraphImpl*>(aGraph)) {}
void Run() override { mGraphImpl->DeviceChangedImpl(); }
// We know that this is valid, because the graph can't shutdown if it has
// messages.
MediaStreamGraphImpl* mGraphImpl;
};
AppendMessage(MakeUnique<Message>(this));
}
void MediaStreamGraphImpl::ReevaluateInputDevice() {
MOZ_ASSERT(OnGraphThread());
bool needToSwitch = false;
if (CurrentDriver()->AsAudioCallbackDriver()) {
AudioCallbackDriver* audioCallbackDriver =
CurrentDriver()->AsAudioCallbackDriver();
if (audioCallbackDriver->InputChannelCount() != AudioInputChannelCount()) {
needToSwitch = true;
}
} else {
// We're already in the process of switching to a audio callback driver,
// which will happen at the next iteration.
// However, maybe it's not the correct number of channels. Re-query the
// correct channel amount at this time.
#ifdef DEBUG
MonitorAutoLock lock(mMonitor);
MOZ_ASSERT(CurrentDriver()->Switching());
#endif
needToSwitch = true;
}
if (needToSwitch) {
AudioCallbackDriver* newDriver =
new AudioCallbackDriver(this, AudioInputChannelCount());
{
MonitorAutoLock lock(mMonitor);
CurrentDriver()->SwitchAtNextIteration(newDriver);
}
}
}
bool MediaStreamGraphImpl::OnGraphThreadOrNotRunning() const {
// either we're on the right thread (and calling CurrentDriver() is safe),
// or we're going to fail the assert anyway, so don't cross-check
// via CurrentDriver().
return mDetectedNotRunning ? NS_IsMainThread() : OnGraphThread();
}
bool MediaStreamGraphImpl::OnGraphThread() const {
// we're on the right thread (and calling mDriver is safe),
MOZ_ASSERT(mDriver);
if (mGraphRunner && mGraphRunner->OnThread()) {
return true;
}
return mDriver->OnThread();
}
bool MediaStreamGraphImpl::ShouldUpdateMainThread() {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
if (mRealtime) {
return true;
}
TimeStamp now = TimeStamp::Now();
// For offline graphs, update now if there is no pending iteration or if it
// has been long enough since the last update.
if (!mNeedAnotherIteration ||
((now - mLastMainThreadUpdate).ToMilliseconds() >
CurrentDriver()->IterationDuration())) {
mLastMainThreadUpdate = now;
return true;
}
return false;
}
void MediaStreamGraphImpl::PrepareUpdatesToMainThreadState(bool aFinalUpdate) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
mMonitor.AssertCurrentThreadOwns();
// We don't want to frequently update the main thread about timing update
// when we are not running in realtime.
if (aFinalUpdate || ShouldUpdateMainThread()) {
// Strip updates that will be obsoleted below, so as to keep the length of
// mStreamUpdates sane.
size_t keptUpdateCount = 0;
for (size_t i = 0; i < mStreamUpdates.Length(); ++i) {
MediaStream* stream = mStreamUpdates[i].mStream;
// RemoveStreamGraphThread() clears mStream in updates for
// streams that are removed from the graph.
MOZ_ASSERT(!stream || stream->GraphImpl() == this);
if (!stream || stream->MainThreadNeedsUpdates()) {
// Discard this update as it has either been cleared when the stream
// was destroyed or there will be a newer update below.
continue;
}
if (keptUpdateCount != i) {
mStreamUpdates[keptUpdateCount] = std::move(mStreamUpdates[i]);
MOZ_ASSERT(!mStreamUpdates[i].mStream);
}
++keptUpdateCount;
}
mStreamUpdates.TruncateLength(keptUpdateCount);
mStreamUpdates.SetCapacity(mStreamUpdates.Length() + mStreams.Length() +
mSuspendedStreams.Length());
for (MediaStream* stream : AllStreams()) {
if (!stream->MainThreadNeedsUpdates()) {
continue;
}
StreamUpdate* update = mStreamUpdates.AppendElement();
update->mStream = stream;
// No blocking to worry about here, since we've passed
// UpdateCurrentTimeForStreams.
update->mNextMainThreadCurrentTime =
stream->GraphTimeToStreamTime(mProcessedTime);
update->mNextMainThreadFinished = stream->mNotifiedFinished;
}
mNextMainThreadGraphTime = mProcessedTime;
if (!mPendingUpdateRunnables.IsEmpty()) {
mUpdateRunnables.AppendElements(std::move(mPendingUpdateRunnables));
}
}
// If this is the final update, then a stable state event will soon be
// posted just before this thread finishes, and so there is no need to also
// post here.
if (!aFinalUpdate &&
// Don't send the message to the main thread if it's not going to have
// any work to do.
!(mUpdateRunnables.IsEmpty() && mStreamUpdates.IsEmpty())) {
EnsureStableStateEventPosted();
}
}
GraphTime MediaStreamGraphImpl::RoundUpToEndOfAudioBlock(GraphTime aTime) {
if (aTime % WEBAUDIO_BLOCK_SIZE == 0) {
return aTime;
}
return RoundUpToNextAudioBlock(aTime);
}
GraphTime MediaStreamGraphImpl::RoundUpToNextAudioBlock(GraphTime aTime) {
uint64_t block = aTime >> WEBAUDIO_BLOCK_SIZE_BITS;
uint64_t nextBlock = block + 1;
GraphTime nextTime = nextBlock << WEBAUDIO_BLOCK_SIZE_BITS;
return nextTime;
}
void MediaStreamGraphImpl::ProduceDataForStreamsBlockByBlock(
uint32_t aStreamIndex, TrackRate aSampleRate) {
MOZ_ASSERT(OnGraphThread());
MOZ_ASSERT(aStreamIndex <= mFirstCycleBreaker,
"Cycle breaker is not AudioNodeStream?");
GraphTime t = mProcessedTime;
while (t < mStateComputedTime) {
GraphTime next = RoundUpToNextAudioBlock(t);
for (uint32_t i = mFirstCycleBreaker; i < mStreams.Length(); ++i) {
auto ns = static_cast<AudioNodeStream*>(mStreams[i]);
MOZ_ASSERT(ns->AsAudioNodeStream());
ns->ProduceOutputBeforeInput(t);
}
for (uint32_t i = aStreamIndex; i < mStreams.Length(); ++i) {
ProcessedMediaStream* ps = mStreams[i]->AsProcessedStream();
if (ps) {
ps->ProcessInput(t, next,
(next == mStateComputedTime)
? ProcessedMediaStream::ALLOW_FINISH
: 0);
}
}
t = next;
}
NS_ASSERTION(t == mStateComputedTime,
"Something went wrong with rounding to block boundaries");
}
void MediaStreamGraphImpl::RunMessageAfterProcessing(
UniquePtr<ControlMessage> aMessage) {
MOZ_ASSERT(OnGraphThread());
if (mFrontMessageQueue.IsEmpty()) {
mFrontMessageQueue.AppendElement();
}
// Only one block is used for messages from the graph thread.
MOZ_ASSERT(mFrontMessageQueue.Length() == 1);
mFrontMessageQueue[0].mMessages.AppendElement(std::move(aMessage));
}
void MediaStreamGraphImpl::RunMessagesInQueue() {
TRACE_AUDIO_CALLBACK();
MOZ_ASSERT(OnGraphThread());
// Calculate independent action times for each batch of messages (each
// batch corresponding to an event loop task). This isolates the performance
// of different scripts to some extent.
for (uint32_t i = 0; i < mFrontMessageQueue.Length(); ++i) {
nsTArray<UniquePtr<ControlMessage>>& messages =
mFrontMessageQueue[i].mMessages;
for (uint32_t j = 0; j < messages.Length(); ++j) {
messages[j]->Run();
}
}
mFrontMessageQueue.Clear();
}
void MediaStreamGraphImpl::UpdateGraph(GraphTime aEndBlockingDecisions) {
TRACE_AUDIO_CALLBACK();
MOZ_ASSERT(OnGraphThread());
MOZ_ASSERT(aEndBlockingDecisions >= mProcessedTime);
// The next state computed time can be the same as the previous: it
// means the driver would have been blocking indefinitly, but the graph has
// been woken up right after having been to sleep.
MOZ_ASSERT(aEndBlockingDecisions >= mStateComputedTime);
UpdateStreamOrder();
bool ensureNextIteration = false;
for (MediaStream* stream : mStreams) {
if (SourceMediaStream* is = stream->AsSourceStream()) {
ensureNextIteration |= is->PullNewData(aEndBlockingDecisions);
is->ExtractPendingInput(mStateComputedTime);
}
if (stream->mFinished) {
// The stream's not suspended, and since it's finished, underruns won't
// stop it playing out. So there's no blocking other than what we impose
// here.
GraphTime endTime = stream->GetStreamTracks().GetLatestTrackEnd() +
stream->mTracksStartTime;
if (endTime <= mStateComputedTime) {
LOG(LogLevel::Verbose,
("%p: MediaStream %p is blocked due to being finished", this,
stream));
stream->mStartBlocking = mStateComputedTime;
} else {
LOG(LogLevel::Verbose,
("%p: MediaStream %p is finished, but not blocked yet (end at %f, "
"with "
"blocking at %f)",
this, stream, MediaTimeToSeconds(stream->GetTracksEnd()),
MediaTimeToSeconds(endTime)));
// Data can't be added to a finished stream, so underruns are
// irrelevant.
stream->mStartBlocking = std::min(endTime, aEndBlockingDecisions);
}
} else {
stream->mStartBlocking = WillUnderrun(stream, aEndBlockingDecisions);
#ifdef MOZ_DIAGNOSTIC_ASSERT_ENABLED
if (SourceMediaStream* s = stream->AsSourceStream()) {
for (StreamTracks::TrackIter i(s->mTracks); !i.IsEnded(); i.Next()) {
if (i->IsEnded()) {
continue;
}
SourceMediaStream::TrackData* data;
{
MutexAutoLock lock(s->mMutex);
data = s->FindDataForTrack(i->GetID());
}
MOZ_ASSERT(data);
if (!data->mPullingEnabled) {
continue;
}
if (i->GetEnd() <
stream->GraphTimeToStreamTime(aEndBlockingDecisions)) {
LOG(LogLevel::Error,
("%p: SourceMediaStream %p track %u (%s) is live and pulled, "
"but wasn't fed "
"enough data. TrackListeners=%zu. Track-end=%f, "
"Iteration-end=%f",
this, stream, i->GetID(),
(i->GetType() == MediaSegment::AUDIO ? "audio" : "video"),
stream->mTrackListeners.Length(),
MediaTimeToSeconds(i->GetEnd()),
MediaTimeToSeconds(
stream->GraphTimeToStreamTime(aEndBlockingDecisions))));
MOZ_DIAGNOSTIC_ASSERT(false,
"A non-finished SourceMediaStream wasn't fed "
"enough data by NotifyPull");
}
}
}
#endif /* MOZ_DIAGNOSTIC_ASSERT_ENABLED */
}
}
for (MediaStream* stream : mSuspendedStreams) {
stream->mStartBlocking = mStateComputedTime;
}
// If the loop is woken up so soon that IterationEnd() barely advances or
// if an offline graph is not currently rendering, we end up having
// aEndBlockingDecisions == mStateComputedTime.
// Since the process interval [mStateComputedTime, aEndBlockingDecision) is
// empty, Process() will not find any unblocked stream and so will not
// ensure another iteration. If the graph should be rendering, then ensure
// another iteration to render.
if (ensureNextIteration || (aEndBlockingDecisions == mStateComputedTime &&
mStateComputedTime < mEndTime)) {
EnsureNextIteration();
}
}
void MediaStreamGraphImpl::Process() {
TRACE_AUDIO_CALLBACK();
MOZ_ASSERT(OnGraphThread());
// Play stream contents.
bool allBlockedForever = true;
// True when we've done ProcessInput for all processed streams.
bool doneAllProducing = false;
// This is the number of frame that are written to the AudioStreams, for
// this cycle.
StreamTime ticksPlayed = 0;
mMixer.StartMixing();
// Figure out what each stream wants to do
for (uint32_t i = 0; i < mStreams.Length(); ++i) {
MediaStream* stream = mStreams[i];
if (!doneAllProducing) {
ProcessedMediaStream* ps = stream->AsProcessedStream();
if (ps) {
AudioNodeStream* n = stream->AsAudioNodeStream();
if (n) {
#ifdef DEBUG
// Verify that the sampling rate for all of the following streams is
// the same
for (uint32_t j = i + 1; j < mStreams.Length(); ++j) {
AudioNodeStream* nextStream = mStreams[j]->AsAudioNodeStream();
if (nextStream) {
MOZ_ASSERT(n->SampleRate() == nextStream->SampleRate(),
"All AudioNodeStreams in the graph must have the same "
"sampling rate");
}
}
#endif
// Since an AudioNodeStream is present, go ahead and
// produce audio block by block for all the rest of the streams.
ProduceDataForStreamsBlockByBlock(i, n->SampleRate());
doneAllProducing = true;
} else {
ps->ProcessInput(mProcessedTime, mStateComputedTime,
ProcessedMediaStream::ALLOW_FINISH);
NS_ASSERTION(
stream->mTracks.GetEarliestTrackEnd() >=
GraphTimeToStreamTimeWithBlocking(stream, mStateComputedTime),
"Stream did not produce enough data");
}
}
}
// Only playback audio and video in real-time mode
if (mRealtime) {
CreateOrDestroyAudioStreams(stream);
if (CurrentDriver()->AsAudioCallbackDriver()) {
StreamTime ticksPlayedForThisStream = PlayAudio(stream);
if (!ticksPlayed) {
ticksPlayed = ticksPlayedForThisStream;
} else {
MOZ_ASSERT(!ticksPlayedForThisStream ||
ticksPlayedForThisStream == ticksPlayed,
"Each stream should have the same number of frame.");
}
}
}
if (stream->mStartBlocking > mProcessedTime) {
allBlockedForever = false;
}
}
if (CurrentDriver()->AsAudioCallbackDriver()) {
if (!ticksPlayed) {
// Nothing was played, so the mixer doesn't know how many frames were
// processed. We still tell it so AudioCallbackDriver knows how much has
// been processed. (bug 1406027)
mMixer.Mix(nullptr,
CurrentDriver()->AsAudioCallbackDriver()->OutputChannelCount(),
mStateComputedTime - mProcessedTime, mSampleRate);
}
mMixer.FinishMixing();
}
if (!allBlockedForever) {
EnsureNextIteration();
}
}
bool MediaStreamGraphImpl::UpdateMainThreadState() {
MOZ_ASSERT(OnGraphThread());
MonitorAutoLock lock(mMonitor);
bool finalUpdate =
mForceShutDown || (IsEmpty() && mBackMessageQueue.IsEmpty());
PrepareUpdatesToMainThreadState(finalUpdate);
if (finalUpdate) {
// Enter shutdown mode when this iteration is completed.
// No need to Destroy streams here. The main-thread owner of each
// stream is responsible for calling Destroy on them.
return false;
}
CurrentDriver()->WaitForNextIteration();
SwapMessageQueues();
return true;
}
bool MediaStreamGraphImpl::OneIteration(GraphTime aStateEnd) {
if (mGraphRunner) {
return mGraphRunner->OneIteration(aStateEnd);
}
return OneIterationImpl(aStateEnd);
}
bool MediaStreamGraphImpl::OneIterationImpl(GraphTime aStateEnd) {
TRACE_AUDIO_CALLBACK();
// Changes to LIFECYCLE_RUNNING occur before starting or reviving the graph
// thread, and so the monitor need not be held to check mLifecycleState.
// LIFECYCLE_THREAD_NOT_STARTED is possible when shutting down offline
// graphs that have not started.
MOZ_DIAGNOSTIC_ASSERT(mLifecycleState <= LIFECYCLE_RUNNING);
MOZ_ASSERT(OnGraphThread());
WebCore::DenormalDisabler disabler;
// Process graph message from the main thread for this iteration.
RunMessagesInQueue();
GraphTime stateEnd = std::min(aStateEnd, GraphTime(mEndTime));
UpdateGraph(stateEnd);
mStateComputedTime = stateEnd;
Process();
GraphTime oldProcessedTime = mProcessedTime;
mProcessedTime = stateEnd;
UpdateCurrentTimeForStreams(oldProcessedTime);
ProcessChunkMetadata(oldProcessedTime);
// Process graph messages queued from RunMessageAfterProcessing() on this
// thread during the iteration.
RunMessagesInQueue();
return UpdateMainThreadState();
}
void MediaStreamGraphImpl::ApplyStreamUpdate(StreamUpdate* aUpdate) {
MOZ_ASSERT(NS_IsMainThread());
mMonitor.AssertCurrentThreadOwns();
MediaStream* stream = aUpdate->mStream;
if (!stream) return;
stream->mMainThreadCurrentTime = aUpdate->mNextMainThreadCurrentTime;
stream->mMainThreadFinished = aUpdate->mNextMainThreadFinished;
if (stream->ShouldNotifyStreamFinished()) {
stream->NotifyMainThreadListeners();
}
}
void MediaStreamGraphImpl::ForceShutDown(
media::ShutdownTicket* aShutdownTicket) {
MOZ_ASSERT(NS_IsMainThread(), "Must be called on main thread");
LOG(LogLevel::Debug, ("%p: MediaStreamGraph::ForceShutdown", this));
if (aShutdownTicket) {
MOZ_ASSERT(!mForceShutdownTicket);
// Avoid waiting forever for a graph to shut down
// synchronously. Reports are that some 3rd-party audio drivers
// occasionally hang in shutdown (both for us and Chrome).
NS_NewTimerWithCallback(
getter_AddRefs(mShutdownTimer), this,
MediaStreamGraph::AUDIO_CALLBACK_DRIVER_SHUTDOWN_TIMEOUT,
nsITimer::TYPE_ONE_SHOT);
}
mForceShutdownTicket = aShutdownTicket;
MonitorAutoLock lock(mMonitor);
mForceShutDown = true;
if (LifecycleStateRef() == LIFECYCLE_THREAD_NOT_STARTED) {
// We *could* have just sent this a message to start up, so don't
// yank the rug out from under it. Tell it to startup and let it
// shut down.
RefPtr<GraphDriver> driver = CurrentDriver();
MonitorAutoUnlock unlock(mMonitor);
driver->Start();
}
EnsureNextIterationLocked();
}
NS_IMETHODIMP
MediaStreamGraphImpl::Notify(nsITimer* aTimer) {
MOZ_ASSERT(NS_IsMainThread());
NS_ASSERTION(!mForceShutdownTicket,
"MediaStreamGraph took too long to shut down!");
// Sigh, graph took too long to shut down. Stop blocking system
// shutdown and hope all is well.
mForceShutdownTicket = nullptr;
return NS_OK;
}
NS_IMETHODIMP
MediaStreamGraphImpl::GetName(nsACString& aName) {
aName.AssignLiteral("MediaStreamGraphImpl");
return NS_OK;
}
/* static */ StaticRefPtr<nsIAsyncShutdownBlocker>
gMediaStreamGraphShutdownBlocker;
namespace {
class MediaStreamGraphShutDownRunnable : public Runnable {
public:
explicit MediaStreamGraphShutDownRunnable(MediaStreamGraphImpl* aGraph)
: Runnable("MediaStreamGraphShutDownRunnable"), mGraph(aGraph) {}
NS_IMETHOD Run() override {
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(mGraph->mDetectedNotRunning && mGraph->mDriver,
"We should know the graph thread control loop isn't running!");
LOG(LogLevel::Debug, ("%p: Shutting down graph", mGraph.get()));
// We've asserted the graph isn't running. Use mDriver instead of
// CurrentDriver to avoid thread-safety checks
#if 0 // AudioCallbackDrivers are released asynchronously anyways
// XXX a better test would be have setting mDetectedNotRunning make sure
// any current callback has finished and block future ones -- or just
// handle it all in Shutdown()!
if (mGraph->mDriver->AsAudioCallbackDriver()) {
MOZ_ASSERT(!mGraph->mDriver->AsAudioCallbackDriver()->InCallback());
}
#endif
if (mGraph->mGraphRunner) {
mGraph->mGraphRunner->Shutdown();
}
mGraph->mDriver
->Shutdown(); // This will wait until it's shutdown since
// we'll start tearing down the graph after this
// Release the driver now so that an AudioCallbackDriver will release its
// SharedThreadPool reference. Each SharedThreadPool reference must be
// released before SharedThreadPool::SpinUntilEmpty() runs on
// xpcom-shutdown-threads. Don't wait for GC/CC to release references to
// objects owning streams, or for expiration of mGraph->mShutdownTimer,
// which won't otherwise release its reference on the graph until
// nsTimerImpl::Shutdown(), which runs after xpcom-shutdown-threads.
{
MonitorAutoLock mon(mGraph->mMonitor);
mGraph->SetCurrentDriver(nullptr);
}
// Safe to access these without the monitor since the graph isn't running.
// We may be one of several graphs. Drop ticket to eventually unblock
// shutdown.
if (mGraph->mShutdownTimer && !mGraph->mForceShutdownTicket) {
MOZ_ASSERT(
false,
"AudioCallbackDriver took too long to shut down and we let shutdown"
" continue - freezing and leaking");
// The timer fired, so we may be deeper in shutdown now. Block any
// further teardown and just leak, for safety.
return NS_OK;
}
// mGraph's thread is not running so it's OK to do whatever here
for (MediaStream* stream : mGraph->AllStreams()) {
// Clean up all MediaSegments since we cannot release Images too
// late during shutdown. Also notify listeners that they were removed
// so they can clean up any gfx resources.
if (SourceMediaStream* source = stream->AsSourceStream()) {
// Finishing a SourceStream prevents new data from being appended.
source->FinishOnGraphThread();
}
stream->GetStreamTracks().Clear();
stream->RemoveAllListenersImpl();
}
MOZ_ASSERT(mGraph->mUpdateRunnables.IsEmpty());
mGraph->mPendingUpdateRunnables.Clear();
mGraph->mForceShutdownTicket = nullptr;
// We can't block past the final LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION
// stage, since completion of that stage requires all streams to be freed,
// which requires shutdown to proceed.
if (mGraph->IsEmpty()) {
// mGraph is no longer needed, so delete it.
mGraph->Destroy();
} else {
// The graph is not empty. We must be in a forced shutdown, or a
// non-realtime graph that has finished processing. Some later
// AppendMessage will detect that the graph has been emptied, and
// delete it.
NS_ASSERTION(mGraph->mForceShutDown || !mGraph->mRealtime,
"Not in forced shutdown?");
mGraph->LifecycleStateRef() =
MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION;
}
return NS_OK;
}
private:
RefPtr<MediaStreamGraphImpl> mGraph;
};
class MediaStreamGraphStableStateRunnable : public Runnable {
public:
explicit MediaStreamGraphStableStateRunnable(MediaStreamGraphImpl* aGraph,
bool aSourceIsMSG)
: Runnable("MediaStreamGraphStableStateRunnable"),
mGraph(aGraph),
mSourceIsMSG(aSourceIsMSG) {}
NS_IMETHOD Run() override {
TRACE();
if (mGraph) {
mGraph->RunInStableState(mSourceIsMSG);
}
return NS_OK;
}
private:
RefPtr<MediaStreamGraphImpl> mGraph;
bool mSourceIsMSG;
};
/*
* Control messages forwarded from main thread to graph manager thread
*/
class CreateMessage : public ControlMessage {
public:
explicit CreateMessage(MediaStream* aStream) : ControlMessage(aStream) {}
void Run() override { mStream->GraphImpl()->AddStreamGraphThread(mStream); }
void RunDuringShutdown() override {
// Make sure to run this message during shutdown too, to make sure
// that we balance the number of streams registered with the graph
// as they're destroyed during shutdown.
Run();
}
};
} // namespace
void MediaStreamGraphImpl::RunInStableState(bool aSourceIsMSG) {
MOZ_ASSERT(NS_IsMainThread(), "Must be called on main thread");
nsTArray<nsCOMPtr<nsIRunnable>> runnables;
// When we're doing a forced shutdown, pending control messages may be
// run on the main thread via RunDuringShutdown. Those messages must
// run without the graph monitor being held. So, we collect them here.
nsTArray<UniquePtr<ControlMessage>> controlMessagesToRunDuringShutdown;
{
MonitorAutoLock lock(mMonitor);
if (aSourceIsMSG) {
MOZ_ASSERT(mPostedRunInStableStateEvent);
mPostedRunInStableStateEvent = false;
}
// This should be kept in sync with the LifecycleState enum in
// MediaStreamGraphImpl.h
const char* LifecycleState_str[] = {
"LIFECYCLE_THREAD_NOT_STARTED", "LIFECYCLE_RUNNING",
"LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP",
"LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN",
"LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION"};
if (LifecycleStateRef() != LIFECYCLE_RUNNING) {
LOG(LogLevel::Debug,
("%p: Running stable state callback. Current state: %s", this,
LifecycleState_str[LifecycleStateRef()]));
}
runnables.SwapElements(mUpdateRunnables);
for (uint32_t i = 0; i < mStreamUpdates.Length(); ++i) {
StreamUpdate* update = &mStreamUpdates[i];
if (update->mStream) {
ApplyStreamUpdate(update);
}
}
mStreamUpdates.Clear();
mMainThreadGraphTime = mNextMainThreadGraphTime;
if (mCurrentTaskMessageQueue.IsEmpty()) {
if (LifecycleStateRef() == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP &&
IsEmpty()) {
// Complete shutdown. First, ensure that this graph is no longer used.
// A new graph graph will be created if one is needed.
// Asynchronously clean up old graph. We don't want to do this
// synchronously because it spins the event loop waiting for threads
// to shut down, and we don't want to do that in a stable state handler.
LifecycleStateRef() = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
LOG(LogLevel::Debug,
("%p: Sending MediaStreamGraphShutDownRunnable", this));
nsCOMPtr<nsIRunnable> event =
new MediaStreamGraphShutDownRunnable(this);
mAbstractMainThread->Dispatch(event.forget());
LOG(LogLevel::Debug, ("%p: Disconnecting MediaStreamGraph", this));
// Find the graph in the hash table and remove it.
for (auto iter = gGraphs.Iter(); !iter.Done(); iter.Next()) {
if (iter.UserData() == this) {
iter.Remove();
break;
}
}
}
} else {
if (LifecycleStateRef() <= LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
MessageBlock* block = mBackMessageQueue.AppendElement();
block->mMessages.SwapElements(mCurrentTaskMessageQueue);
EnsureNextIterationLocked();
}
// If the MediaStreamGraph has more messages going to it, try to revive
// it to process those messages. Don't do this if we're in a forced
// shutdown or it's a non-realtime graph that has already terminated
// processing.
if (LifecycleStateRef() == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP &&
mRealtime && !mForceShutDown) {
LifecycleStateRef() = LIFECYCLE_RUNNING;
// Revive the MediaStreamGraph since we have more messages going to it.
// Note that we need to put messages into its queue before reviving it,
// or it might exit immediately.
{
LOG(LogLevel::Debug,
("%p: Reviving this graph! %s", this,
CurrentDriver()->AsAudioCallbackDriver() ? "AudioCallbackDriver"
: "SystemClockDriver"));
RefPtr<GraphDriver> driver = CurrentDriver();
MonitorAutoUnlock unlock(mMonitor);
driver->Revive();
}
}
}
if (LifecycleStateRef() == LIFECYCLE_THREAD_NOT_STARTED) {
LifecycleStateRef() = LIFECYCLE_RUNNING;
// Start the thread now. We couldn't start it earlier because
// the graph might exit immediately on finding it has no streams. The
// first message for a new graph must create a stream.
{
// We should exit the monitor for now, because starting a stream might
// take locks, and we don't want to deadlock.
LOG(LogLevel::Debug,
("%p: Starting a graph with a %s", this,
CurrentDriver()->AsAudioCallbackDriver() ? "AudioCallbackDriver"
: "SystemClockDriver"));
RefPtr<GraphDriver> driver = CurrentDriver();
MonitorAutoUnlock unlock(mMonitor);
driver->Start();
// It's not safe to Shutdown() a thread from StableState, and
// releasing this may shutdown a SystemClockDriver thread.
// Proxy the release to outside of StableState.
NS_ReleaseOnMainThreadSystemGroup("MediaStreamGraphImpl::CurrentDriver",
driver.forget(),
true); // always proxy
}
}
if ((mForceShutDown || !mRealtime) &&
LifecycleStateRef() == LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
// Defer calls to RunDuringShutdown() to happen while mMonitor is not
// held.
for (uint32_t i = 0; i < mBackMessageQueue.Length(); ++i) {
MessageBlock& mb = mBackMessageQueue[i];
controlMessagesToRunDuringShutdown.AppendElements(
std::move(mb.mMessages));
}
mBackMessageQueue.Clear();
MOZ_ASSERT(mCurrentTaskMessageQueue.IsEmpty());
// Stop MediaStreamGraph threads. Do not clear gGraph since
// we have outstanding DOM objects that may need it.
LifecycleStateRef() = LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
nsCOMPtr<nsIRunnable> event = new MediaStreamGraphShutDownRunnable(this);
mAbstractMainThread->Dispatch(event.forget());
}
mDetectedNotRunning = LifecycleStateRef() > LIFECYCLE_RUNNING;
}
// Make sure we get a new current time in the next event loop task
if (!aSourceIsMSG) {
MOZ_ASSERT(mPostedRunInStableState);
mPostedRunInStableState = false;
}
for (uint32_t i = 0; i < controlMessagesToRunDuringShutdown.Length(); ++i) {
controlMessagesToRunDuringShutdown[i]->RunDuringShutdown();
}
#ifdef DEBUG
mCanRunMessagesSynchronously =
mDetectedNotRunning &&
LifecycleStateRef() >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN;
#endif
for (uint32_t i = 0; i < runnables.Length(); ++i) {
runnables[i]->Run();
}
}
void MediaStreamGraphImpl::EnsureRunInStableState() {
MOZ_ASSERT(NS_IsMainThread(), "main thread only");
if (mPostedRunInStableState) return;
mPostedRunInStableState = true;
nsCOMPtr<nsIRunnable> event =
new MediaStreamGraphStableStateRunnable(this, false);
nsContentUtils::RunInStableState(event.forget());
}
void MediaStreamGraphImpl::EnsureStableStateEventPosted() {
MOZ_ASSERT(OnGraphThread());
mMonitor.AssertCurrentThreadOwns();
if (mPostedRunInStableStateEvent) return;
mPostedRunInStableStateEvent = true;
nsCOMPtr<nsIRunnable> event =
new MediaStreamGraphStableStateRunnable(this, true);
mAbstractMainThread->Dispatch(event.forget());
}
void MediaStreamGraphImpl::SignalMainThreadCleanup() {
MOZ_ASSERT(mDriver->OnThread());
MonitorAutoLock lock(mMonitor);
// LIFECYCLE_THREAD_NOT_STARTED is possible when shutting down offline
// graphs that have not started.
MOZ_DIAGNOSTIC_ASSERT(mLifecycleState <= LIFECYCLE_RUNNING);
LOG(LogLevel::Debug,
("%p: MediaStreamGraph waiting for main thread cleanup", this));
LifecycleStateRef() =
MediaStreamGraphImpl::LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP;
EnsureStableStateEventPosted();
}
void MediaStreamGraphImpl::AppendMessage(UniquePtr<ControlMessage> aMessage) {
MOZ_ASSERT(NS_IsMainThread(), "main thread only");
MOZ_ASSERT(!aMessage->GetStream() || !aMessage->GetStream()->IsDestroyed(),
"Stream already destroyed");
if (mDetectedNotRunning &&
LifecycleStateRef() > LIFECYCLE_WAITING_FOR_MAIN_THREAD_CLEANUP) {
// The graph control loop is not running and main thread cleanup has
// happened. From now on we can't append messages to
// mCurrentTaskMessageQueue, because that will never be processed again, so
// just RunDuringShutdown this message. This should only happen during
// forced shutdown, or after a non-realtime graph has finished processing.
#ifdef DEBUG
MOZ_ASSERT(mCanRunMessagesSynchronously);
mCanRunMessagesSynchronously = false;
#endif
aMessage->RunDuringShutdown();
#ifdef DEBUG
mCanRunMessagesSynchronously = true;
#endif
if (IsEmpty() &&
LifecycleStateRef() >= LIFECYCLE_WAITING_FOR_STREAM_DESTRUCTION) {
// Find the graph in the hash table and remove it.
for (auto iter = gGraphs.Iter(); !iter.Done(); iter.Next()) {
if (iter.UserData() == this) {
iter.Remove();
break;
}
}
Destroy();
}
return;
}
mCurrentTaskMessageQueue.AppendElement(std::move(aMessage));
EnsureRunInStableState();
}
void MediaStreamGraphImpl::Dispatch(already_AddRefed<nsIRunnable>&& aRunnable) {
mAbstractMainThread->Dispatch(std::move(aRunnable));
}
MediaStream::MediaStream()
: mTracksStartTime(0),
mStartBlocking(GRAPH_TIME_MAX),
mSuspendedCount(0),
mFinished(false),
mNotifiedFinished(false),
mHasCurrentData(false),
mMainThreadCurrentTime(0),
mMainThreadFinished(false),
mFinishedNotificationSent(false),
mMainThreadDestroyed(false),
mNrOfMainThreadUsers(0),
mGraph(nullptr) {
MOZ_COUNT_CTOR(MediaStream);
}
MediaStream::~MediaStream() {
MOZ_COUNT_DTOR(MediaStream);
NS_ASSERTION(mMainThreadDestroyed, "Should have been destroyed already");
NS_ASSERTION(mMainThreadListeners.IsEmpty(),
"All main thread listeners should have been removed");
}
size_t MediaStream::SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const {
size_t amount = 0;
// Not owned:
// - mGraph - Not reported here
// - mConsumers - elements
// Future:
// - mVideoOutputs - elements
// - mLastPlayedVideoFrame
// - mTrackListeners - elements
// - mAudioOutputStream - elements
amount += mTracks.SizeOfExcludingThis(aMallocSizeOf);
amount += mAudioOutputs.ShallowSizeOfExcludingThis(aMallocSizeOf);
amount += mVideoOutputs.ShallowSizeOfExcludingThis(aMallocSizeOf);
amount += mTrackListeners.ShallowSizeOfExcludingThis(aMallocSizeOf);
amount += mMainThreadListeners.ShallowSizeOfExcludingThis(aMallocSizeOf);
amount += mDisabledTracks.ShallowSizeOfExcludingThis(aMallocSizeOf);
amount += mConsumers.ShallowSizeOfExcludingThis(aMallocSizeOf);
return amount;
}
size_t MediaStream::SizeOfIncludingThis(MallocSizeOf aMallocSizeOf) const {
return aMallocSizeOf(this) + SizeOfExcludingThis(aMallocSizeOf);
}
void MediaStream::IncrementSuspendCount() {
++mSuspendedCount;
if (mSuspendedCount == 1) {
for (uint32_t i = 0; i < mConsumers.Length(); ++i) {
mConsumers[i]->Suspended();
}
}
}
void MediaStream::DecrementSuspendCount() {
NS_ASSERTION(mSuspendedCount > 0, "Suspend count underrun");
--mSuspendedCount;
if (mSuspendedCount == 0) {
for (uint32_t i = 0; i < mConsumers.Length(); ++i) {
mConsumers[i]->Resumed();
}
}
}
MediaStreamGraphImpl* MediaStream::GraphImpl() { return mGraph; }
const MediaStreamGraphImpl* MediaStream::GraphImpl() const { return mGraph; }
MediaStreamGraph* MediaStream::Graph() { return mGraph; }
void MediaStream::SetGraphImpl(MediaStreamGraphImpl* aGraph) {
MOZ_ASSERT(!mGraph, "Should only be called once");
mGraph = aGraph;
mTracks.InitGraphRate(aGraph->GraphRate());
}
void MediaStream::SetGraphImpl(MediaStreamGraph* aGraph) {
MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(aGraph);
SetGraphImpl(graph);
}
StreamTime MediaStream::GraphTimeToStreamTime(GraphTime aTime) const {
NS_ASSERTION(mStartBlocking == GraphImpl()->mStateComputedTime ||
aTime <= mStartBlocking,
"Incorrectly ignoring blocking!");
return aTime - mTracksStartTime;
}
GraphTime MediaStream::StreamTimeToGraphTime(StreamTime aTime) const {
NS_ASSERTION(mStartBlocking == GraphImpl()->mStateComputedTime ||
aTime + mTracksStartTime <= mStartBlocking,
"Incorrectly ignoring blocking!");
return aTime + mTracksStartTime;
}
StreamTime MediaStream::GraphTimeToStreamTimeWithBlocking(
GraphTime aTime) const {
return GraphImpl()->GraphTimeToStreamTimeWithBlocking(this, aTime);
}
void MediaStream::FinishOnGraphThread() {
if (mFinished) {
return;
}
LOG(LogLevel::Debug, ("MediaStream %p will finish", this));
#ifdef DEBUG
if (!mGraph->mForceShutDown) {
// All tracks must be ended by the source before the stream finishes.
// The exception is in forced shutdown, where we finish all streams as is.
for (StreamTracks::TrackIter track(mTracks); !track.IsEnded();
track.Next()) {
if (!track->IsEnded()) {
LOG(LogLevel::Error,
("MediaStream %p will finish, but track %d has not ended.", this,
track->GetID()));
NS_ASSERTION(false, "Finished stream cannot contain live track");
}
}
}
#endif
mFinished = true;
// Let the MSG knows that this stream can be destroyed if necessary to avoid
// unnecessarily processing it in the future.
GraphImpl()->SetStreamOrderDirty();
}
StreamTracks::Track* MediaStream::FindTrack(TrackID aID) const {
return mTracks.FindTrack(aID);
}
StreamTracks::Track* MediaStream::EnsureTrack(TrackID aTrackId) {
StreamTracks::Track* track = mTracks.FindTrack(aTrackId);
if (!track) {
track = &mTracks.AddTrack(aTrackId, 0, new AudioSegment());
}
return track;
}
void MediaStream::RemoveAllListenersImpl() {
GraphImpl()->AssertOnGraphThreadOrNotRunning();
auto trackListeners(mTrackListeners);
for (auto& l : trackListeners) {
l.mListener->NotifyRemoved();
}
mTrackListeners.Clear();
RemoveAllDirectListenersImpl();
auto videoOutputs(mVideoOutputs);
for (auto& l : videoOutputs) {
l.mListener->NotifyRemoved();
}
mVideoOutputs.Clear();
}
void MediaStream::DestroyImpl() {
for (int32_t i = mConsumers.Length() - 1; i >= 0; --i) {
mConsumers[i]->Disconnect();
}
mTracks.Clear();
mGraph = nullptr;
}
void MediaStream::Destroy() {
NS_ASSERTION(mNrOfMainThreadUsers == 0,
"Do not mix Destroy() and RegisterUser()/UnregisterUser()");
// Keep this stream alive until we leave this method
RefPtr<MediaStream> kungFuDeathGrip = this;
class Message : public ControlMessage {
public:
explicit Message(MediaStream* aStream) : ControlMessage(aStream) {}
void Run() override {
mStream->RemoveAllListenersImpl();
auto graph = mStream->GraphImpl();
mStream->DestroyImpl();
graph->RemoveStreamGraphThread(mStream);
}
void RunDuringShutdown() override { Run(); }
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this));
// Message::RunDuringShutdown may have removed this stream from the graph,
// but our kungFuDeathGrip above will have kept this stream alive if
// necessary.
mMainThreadDestroyed = true;
}
void MediaStream::RegisterUser() {
MOZ_ASSERT(NS_IsMainThread());
++mNrOfMainThreadUsers;
}
void MediaStream::UnregisterUser() {
MOZ_ASSERT(NS_IsMainThread());
--mNrOfMainThreadUsers;
NS_ASSERTION(mNrOfMainThreadUsers >= 0, "Double-removal of main thread user");
NS_ASSERTION(!IsDestroyed(),
"Do not mix Destroy() and RegisterUser()/UnregisterUser()");
if (mNrOfMainThreadUsers == 0) {
Destroy();
}
}
void MediaStream::AddAudioOutput(void* aKey) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey)
: ControlMessage(aStream), mKey(aKey) {}
void Run() override { mStream->AddAudioOutputImpl(mKey); }
void* mKey;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey));
}
void MediaStream::SetAudioOutputVolumeImpl(void* aKey, float aVolume) {
for (uint32_t i = 0; i < mAudioOutputs.Length(); ++i) {
if (mAudioOutputs[i].mKey == aKey) {
mAudioOutputs[i].mVolume = aVolume;
return;
}
}
NS_ERROR("Audio output key not found");
}
void MediaStream::SetAudioOutputVolume(void* aKey, float aVolume) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey, float aVolume)
: ControlMessage(aStream), mKey(aKey), mVolume(aVolume) {}
void Run() override { mStream->SetAudioOutputVolumeImpl(mKey, mVolume); }
void* mKey;
float mVolume;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey, aVolume));
}
void MediaStream::AddAudioOutputImpl(void* aKey) {
LOG(LogLevel::Info,
("MediaStream %p Adding AudioOutput for key %p", this, aKey));
mAudioOutputs.AppendElement(AudioOutput(aKey));
}
void MediaStream::RemoveAudioOutputImpl(void* aKey) {
LOG(LogLevel::Info,
("MediaStream %p Removing AudioOutput for key %p", this, aKey));
for (uint32_t i = 0; i < mAudioOutputs.Length(); ++i) {
if (mAudioOutputs[i].mKey == aKey) {
mAudioOutputs.RemoveElementAt(i);
return;
}
}
NS_ERROR("Audio output key not found");
}
void MediaStream::RemoveAudioOutput(void* aKey) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, void* aKey)
: ControlMessage(aStream), mKey(aKey) {}
void Run() override { mStream->RemoveAudioOutputImpl(mKey); }
void* mKey;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aKey));
}
void MediaStream::AddVideoOutputImpl(
already_AddRefed<MediaStreamVideoSink> aSink, TrackID aID) {
RefPtr<MediaStreamVideoSink> sink = aSink;
LOG(LogLevel::Info,
("MediaStream %p Adding MediaStreamVideoSink %p as output", this,
sink.get()));
MOZ_ASSERT(aID != TRACK_NONE);
for (auto entry : mVideoOutputs) {
if (entry.mListener == sink &&
(entry.mTrackID == TRACK_ANY || entry.mTrackID == aID)) {
return;
}
}
TrackBound<MediaStreamVideoSink>* l = mVideoOutputs.AppendElement();
l->mListener = sink;
l->mTrackID = aID;
AddDirectTrackListenerImpl(sink.forget(), aID);
}
void MediaStream::RemoveVideoOutputImpl(MediaStreamVideoSink* aSink,
TrackID aID) {
LOG(LogLevel::Info,
("MediaStream %p Removing MediaStreamVideoSink %p as output", this,
aSink));
MOZ_ASSERT(aID != TRACK_NONE);
// Ensure that any frames currently queued for playback by the compositor
// are removed.
aSink->ClearFrames();
for (size_t i = 0; i < mVideoOutputs.Length(); ++i) {
if (mVideoOutputs[i].mListener == aSink &&
(mVideoOutputs[i].mTrackID == TRACK_ANY ||
mVideoOutputs[i].mTrackID == aID)) {
mVideoOutputs.RemoveElementAt(i);
}
}
RemoveDirectTrackListenerImpl(aSink, aID);
}
void MediaStream::AddVideoOutput(MediaStreamVideoSink* aSink, TrackID aID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamVideoSink* aSink, TrackID aID)
: ControlMessage(aStream), mSink(aSink), mID(aID) {}
void Run() override { mStream->AddVideoOutputImpl(mSink.forget(), mID); }
RefPtr<MediaStreamVideoSink> mSink;
TrackID mID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aSink, aID));
}
void MediaStream::RemoveVideoOutput(MediaStreamVideoSink* aSink, TrackID aID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamVideoSink* aSink, TrackID aID)
: ControlMessage(aStream), mSink(aSink), mID(aID) {}
void Run() override { mStream->RemoveVideoOutputImpl(mSink, mID); }
RefPtr<MediaStreamVideoSink> mSink;
TrackID mID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aSink, aID));
}
void MediaStream::Suspend() {
class Message : public ControlMessage {
public:
explicit Message(MediaStream* aStream) : ControlMessage(aStream) {}
void Run() override {
mStream->GraphImpl()->IncrementSuspendCount(mStream);
}
};
// This can happen if this method has been called asynchronously, and the
// stream has been destroyed since then.
if (mMainThreadDestroyed) {
return;
}
GraphImpl()->AppendMessage(MakeUnique<Message>(this));
}
void MediaStream::Resume() {
class Message : public ControlMessage {
public:
explicit Message(MediaStream* aStream) : ControlMessage(aStream) {}
void Run() override {
mStream->GraphImpl()->DecrementSuspendCount(mStream);
}
};
// This can happen if this method has been called asynchronously, and the
// stream has been destroyed since then.
if (mMainThreadDestroyed) {
return;
}
GraphImpl()->AppendMessage(MakeUnique<Message>(this));
}
void MediaStream::AddTrackListenerImpl(
already_AddRefed<MediaStreamTrackListener> aListener, TrackID aTrackID) {
TrackBound<MediaStreamTrackListener>* l = mTrackListeners.AppendElement();
l->mListener = aListener;
l->mTrackID = aTrackID;
StreamTracks::Track* track = FindTrack(aTrackID);
if (!track) {
return;
}
PrincipalHandle lastPrincipalHandle =
track->GetSegment()->GetLastPrincipalHandle();
l->mListener->NotifyPrincipalHandleChanged(Graph(), lastPrincipalHandle);
if (track->IsEnded() &&
track->GetEnd() <=
GraphTimeToStreamTime(GraphImpl()->mStateComputedTime)) {
l->mListener->NotifyEnded();
}
}
void MediaStream::AddTrackListener(MediaStreamTrackListener* aListener,
TrackID aTrackID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamTrackListener* aListener,
TrackID aTrackID)
: ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {}
void Run() override {
mStream->AddTrackListenerImpl(mListener.forget(), mTrackID);
}
RefPtr<MediaStreamTrackListener> mListener;
TrackID mTrackID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID));
}
void MediaStream::RemoveTrackListenerImpl(MediaStreamTrackListener* aListener,
TrackID aTrackID) {
for (size_t i = 0; i < mTrackListeners.Length(); ++i) {
if (mTrackListeners[i].mListener == aListener &&
mTrackListeners[i].mTrackID == aTrackID) {
mTrackListeners[i].mListener->NotifyRemoved();
mTrackListeners.RemoveElementAt(i);
return;
}
}
}
void MediaStream::RemoveTrackListener(MediaStreamTrackListener* aListener,
TrackID aTrackID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, MediaStreamTrackListener* aListener,
TrackID aTrackID)
: ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {}
void Run() override {
mStream->RemoveTrackListenerImpl(mListener, mTrackID);
}
void RunDuringShutdown() override {
// During shutdown we still want the listener's NotifyRemoved to be
// called, since not doing that might block shutdown of other modules.
Run();
}
RefPtr<MediaStreamTrackListener> mListener;
TrackID mTrackID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID));
}
void MediaStream::AddDirectTrackListenerImpl(
already_AddRefed<DirectMediaStreamTrackListener> aListener,
TrackID aTrackID) {
// Base implementation, for streams that don't support direct track listeners.
RefPtr<DirectMediaStreamTrackListener> listener = aListener;
listener->NotifyDirectListenerInstalled(
DirectMediaStreamTrackListener::InstallationResult::STREAM_NOT_SUPPORTED);
}
void MediaStream::AddDirectTrackListener(
DirectMediaStreamTrackListener* aListener, TrackID aTrackID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, DirectMediaStreamTrackListener* aListener,
TrackID aTrackID)
: ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {}
void Run() override {
mStream->AddDirectTrackListenerImpl(mListener.forget(), mTrackID);
}
RefPtr<DirectMediaStreamTrackListener> mListener;
TrackID mTrackID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID));
}
void MediaStream::RemoveDirectTrackListenerImpl(
DirectMediaStreamTrackListener* aListener, TrackID aTrackID) {
// Base implementation, the listener was never added so nothing to do.
RefPtr<DirectMediaStreamTrackListener> listener = aListener;
}
void MediaStream::RemoveDirectTrackListener(
DirectMediaStreamTrackListener* aListener, TrackID aTrackID) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, DirectMediaStreamTrackListener* aListener,
TrackID aTrackID)
: ControlMessage(aStream), mListener(aListener), mTrackID(aTrackID) {}
void Run() override {
mStream->RemoveDirectTrackListenerImpl(mListener, mTrackID);
}
void RunDuringShutdown() override {
// During shutdown we still want the listener's
// NotifyDirectListenerUninstalled to be called, since not doing that
// might block shutdown of other modules.
Run();
}
RefPtr<DirectMediaStreamTrackListener> mListener;
TrackID mTrackID;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aListener, aTrackID));
}
void MediaStream::RunAfterPendingUpdates(
already_AddRefed<nsIRunnable> aRunnable) {
MOZ_ASSERT(NS_IsMainThread());
MediaStreamGraphImpl* graph = GraphImpl();
nsCOMPtr<nsIRunnable> runnable(aRunnable);
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, already_AddRefed<nsIRunnable> aRunnable)
: ControlMessage(aStream), mRunnable(aRunnable) {}
void Run() override {
mStream->Graph()->DispatchToMainThreadStableState(mRunnable.forget());
}
void RunDuringShutdown() override {
// Don't run mRunnable now as it may call AppendMessage() which would
// assume that there are no remaining controlMessagesToRunDuringShutdown.
MOZ_ASSERT(NS_IsMainThread());
mStream->GraphImpl()->Dispatch(mRunnable.forget());
}
private:
nsCOMPtr<nsIRunnable> mRunnable;
};
graph->AppendMessage(MakeUnique<Message>(this, runnable.forget()));
}
void MediaStream::SetTrackEnabledImpl(TrackID aTrackID,
DisabledTrackMode aMode) {
if (aMode == DisabledTrackMode::ENABLED) {
for (int32_t i = mDisabledTracks.Length() - 1; i >= 0; --i) {
if (aTrackID == mDisabledTracks[i].mTrackID) {
mDisabledTracks.RemoveElementAt(i);
return;
}
}
} else {
for (const DisabledTrack& t : mDisabledTracks) {
if (aTrackID == t.mTrackID) {
NS_ERROR("Changing disabled track mode for a track is not allowed");
return;
}
}
mDisabledTracks.AppendElement(DisabledTrack(aTrackID, aMode));
}
}
DisabledTrackMode MediaStream::GetDisabledTrackMode(TrackID aTrackID) {
for (const DisabledTrack& t : mDisabledTracks) {
if (t.mTrackID == aTrackID) {
return t.mMode;
}
}
return DisabledTrackMode::ENABLED;
}
void MediaStream::SetTrackEnabled(TrackID aTrackID, DisabledTrackMode aMode) {
class Message : public ControlMessage {
public:
Message(MediaStream* aStream, TrackID aTrackID, DisabledTrackMode aMode)
: ControlMessage(aStream), mTrackID(aTrackID), mMode(aMode) {}
void Run() override { mStream->SetTrackEnabledImpl(mTrackID, mMode); }
TrackID mTrackID;
DisabledTrackMode mMode;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aTrackID, aMode));
}
void MediaStream::ApplyTrackDisabling(TrackID aTrackID, MediaSegment* aSegment,
MediaSegment* aRawSegment) {
DisabledTrackMode mode = GetDisabledTrackMode(aTrackID);
if (mode == DisabledTrackMode::ENABLED) {
return;
}
if (mode == DisabledTrackMode::SILENCE_BLACK) {
aSegment->ReplaceWithDisabled();
if (aRawSegment) {
aRawSegment->ReplaceWithDisabled();
}
} else if (mode == DisabledTrackMode::SILENCE_FREEZE) {
aSegment->ReplaceWithNull();
if (aRawSegment) {
aRawSegment->ReplaceWithNull();
}
} else {
MOZ_CRASH("Unsupported mode");
}
}
void MediaStream::AddMainThreadListener(
MainThreadMediaStreamListener* aListener) {
MOZ_ASSERT(NS_IsMainThread());
MOZ_ASSERT(aListener);
MOZ_ASSERT(!mMainThreadListeners.Contains(aListener));
mMainThreadListeners.AppendElement(aListener);
// If it is not yet time to send the notification, then finish here.
if (!mFinishedNotificationSent) {
return;
}
class NotifyRunnable final : public Runnable {
public:
explicit NotifyRunnable(MediaStream* aStream)
: Runnable("MediaStream::NotifyRunnable"), mStream(aStream) {}
NS_IMETHOD Run() override {
MOZ_ASSERT(NS_IsMainThread());
mStream->NotifyMainThreadListeners();
return NS_OK;
}
private:
~NotifyRunnable() {}
RefPtr<MediaStream> mStream;
};
nsCOMPtr<nsIRunnable> runnable = new NotifyRunnable(this);
GraphImpl()->Dispatch(runnable.forget());
}
SourceMediaStream::SourceMediaStream()
: MediaStream(),
mMutex("mozilla::media::SourceMediaStream"),
mFinishPending(false) {}
nsresult SourceMediaStream::OpenAudioInput(CubebUtils::AudioDeviceID aID,
AudioDataListener* aListener) {
MOZ_ASSERT(GraphImpl());
mInputListener = aListener;
return GraphImpl()->OpenAudioInput(aID, aListener);
}
void SourceMediaStream::CloseAudioInput(Maybe<CubebUtils::AudioDeviceID>& aID,
AudioDataListener* aListener) {
MOZ_ASSERT(mInputListener == aListener);
// Destroy() may have run already and cleared this
if (GraphImpl() && mInputListener) {
GraphImpl()->CloseAudioInput(aID, aListener);
}
mInputListener = nullptr;
}
void SourceMediaStream::DestroyImpl() {
Maybe<CubebUtils::AudioDeviceID> id = Nothing();
CloseAudioInput(id, mInputListener);
GraphImpl()->AssertOnGraphThreadOrNotRunning();
for (int32_t i = mConsumers.Length() - 1; i >= 0; --i) {
// Disconnect before we come under mMutex's lock since it can call back
// through RemoveDirectTrackListenerImpl() and deadlock.
mConsumers[i]->Disconnect();
}
// Hold mMutex while mGraph is reset so that other threads holding mMutex
// can null-check know that the graph will not destroyed.
MutexAutoLock lock(mMutex);
MediaStream::DestroyImpl();
}
void SourceMediaStream::SetPullingEnabled(TrackID aTrackID, bool aEnabled) {
class Message : public ControlMessage {
public:
Message(SourceMediaStream* aStream, TrackID aTrackID, bool aEnabled)
: ControlMessage(nullptr),
mStream(aStream),
mTrackID(aTrackID),
mEnabled(aEnabled) {}
void Run() override {
MutexAutoLock lock(mStream->mMutex);
TrackData* data = mStream->FindDataForTrack(mTrackID);
if (!data) {
// We can't enable pulling for a track that was never added. We ignore
// this if we're disabling pulling, since shutdown sequences are
// complex. If there's truly an issue we'll have issues enabling anyway.
MOZ_ASSERT_IF(mEnabled,
mStream->mTracks.FindTrack(mTrackID) &&
mStream->mTracks.FindTrack(mTrackID)->IsEnded());
return;
}
data->mPullingEnabled = mEnabled;
}
SourceMediaStream* mStream;
TrackID mTrackID;
bool mEnabled;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aTrackID, aEnabled));
}
bool SourceMediaStream::PullNewData(GraphTime aDesiredUpToTime) {
TRACE_AUDIO_CALLBACK_COMMENT("SourceMediaStream %p", this);
MutexAutoLock lock(mMutex);
if (mFinished) {
return false;
}
bool streamPullingEnabled = false;
for (const TrackData& track : mUpdateTracks) {
if (!(track.mCommands & TrackEventCommand::TRACK_EVENT_ENDED) &&
track.mPullingEnabled) {
// At least one track in this stream is pulled. We want to consume it in
// real-time (i.e., not block the stream).
streamPullingEnabled = true;
break;
}
}
// Compute how much stream time we'll need assuming we don't block
// the stream at all.
StreamTime t = GraphTimeToStreamTime(aDesiredUpToTime);
for (const TrackData& track : mUpdateTracks) {
if (track.mCommands & TRACK_END) {
continue;
}
StreamTime current;
if (track.mCommands & TRACK_CREATE) {
// This track hasn't been created yet. Use the stream's current time
// (which the track will get as its start time later).
current = GraphTimeToStreamTime(GraphImpl()->mStateComputedTime);
} else {
current = track.mEndOfFlushedData + track.mData->GetDuration();
}
if (t <= current) {
continue;
}
if (!track.mPullingEnabled) {
if (streamPullingEnabled) {
LOG(LogLevel::Verbose,
("%p: Pulling disabled for track but enabled for stream, append "
"null data; stream=%p track=%d t=%f current end=%f",
GraphImpl(), this, track.mID, GraphImpl()->MediaTimeToSeconds(t),
GraphImpl()->MediaTimeToSeconds(current)));
track.mData->AppendNullData(t - current);
}
continue;
}
LOG(LogLevel::Verbose,
("%p: Calling NotifyPull stream=%p track=%d t=%f current end=%f",
GraphImpl(), this, track.mID, GraphImpl()->MediaTimeToSeconds(t),
GraphImpl()->MediaTimeToSeconds(current)));
MutexAutoUnlock unlock(mMutex);
for (TrackBound<MediaStreamTrackListener>& l : mTrackListeners) {
if (l.mTrackID == track.mID) {
l.mListener->NotifyPull(Graph(), current, t);
}
}
}
return true;
}
void SourceMediaStream::ExtractPendingInput(GraphTime aCurrentTime) {
MutexAutoLock lock(mMutex);
bool finished = mFinishPending;
StreamTime streamCurrentTime = GraphTimeToStreamTime(aCurrentTime);
for (int32_t i = mUpdateTracks.Length() - 1; i >= 0; --i) {
SourceMediaStream::TrackData* data = &mUpdateTracks[i];
ApplyTrackDisabling(data->mID, data->mData);
// Dealing with NotifyQueuedTrackChanges and NotifyQueuedAudioData part.
// The logic is different from the manipulating of aStream->mTracks part.
// So it is not combined with the manipulating of aStream->mTracks part.
StreamTime offset =
(data->mCommands & SourceMediaStream::TRACK_CREATE)
? streamCurrentTime
: mTracks.FindTrack(data->mID)->GetSegment()->GetDuration();
for (TrackBound<MediaStreamTrackListener>& b : mTrackListeners) {
if (b.mTrackID != data->mID) {
continue;
}
b.mListener->NotifyQueuedChanges(GraphImpl(), offset, *data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
MediaSegment* segment = data->mData.forget();
LOG(LogLevel::Debug,
("%p: SourceMediaStream %p creating track %d, start %" PRId64
", initial end %" PRId64,
GraphImpl(), this, data->mID, int64_t(streamCurrentTime),
int64_t(segment->GetDuration())));
segment->InsertNullDataAtStart(streamCurrentTime);
data->mEndOfFlushedData += segment->GetDuration();
mTracks.AddTrack(data->mID, streamCurrentTime, segment);
// The track has taken ownership of data->mData, so let's replace
// data->mData with an empty clone.
data->mData = segment->CreateEmptyClone();
data->mCommands &= ~SourceMediaStream::TRACK_CREATE;
} else if (data->mData->GetDuration() > 0) {
MediaSegment* dest = mTracks.FindTrack(data->mID)->GetSegment();
LOG(LogLevel::Verbose,
("%p: SourceMediaStream %p track %d, advancing end from %" PRId64
" to %" PRId64,
GraphImpl(), this, data->mID, int64_t(dest->GetDuration()),
int64_t(dest->GetDuration() + data->mData->GetDuration())));
data->mEndOfFlushedData += data->mData->GetDuration();
dest->AppendFrom(data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_END) {
mTracks.FindTrack(data->mID)->SetEnded();
mUpdateTracks.RemoveElementAt(i);
}
}
if (mTracks.GetEarliestTrackEnd() > 0) {
mHasCurrentData = true;
}
if (finished) {
FinishOnGraphThread();
}
}
void SourceMediaStream::AddTrackInternal(TrackID aID, TrackRate aRate,
MediaSegment* aSegment,
uint32_t aFlags) {
MutexAutoLock lock(mMutex);
nsTArray<TrackData>* track_data =
(aFlags & ADDTRACK_QUEUED) ? &mPendingTracks : &mUpdateTracks;
TrackData* data = track_data->AppendElement();
LOG(LogLevel::Debug,
("%p: AddTrackInternal: %lu/%lu", GraphImpl(),
(long)mPendingTracks.Length(), (long)mUpdateTracks.Length()));
data->mID = aID;
data->mInputRate = aRate;
data->mResamplerChannelCount = 0;
data->mEndOfFlushedData = 0;
data->mCommands = TRACK_CREATE;
data->mData = aSegment;
data->mPullingEnabled = false;
ResampleAudioToGraphSampleRate(data, aSegment);
if (!(aFlags & ADDTRACK_QUEUED) && GraphImpl()) {
GraphImpl()->EnsureNextIteration();
}
}
void SourceMediaStream::AddAudioTrack(TrackID aID, TrackRate aRate,
AudioSegment* aSegment, uint32_t aFlags) {
AddTrackInternal(aID, aRate, aSegment, aFlags);
}
void SourceMediaStream::FinishAddTracks() {
MutexAutoLock lock(mMutex);
mUpdateTracks.AppendElements(std::move(mPendingTracks));
LOG(LogLevel::Debug,
("%p: FinishAddTracks: %lu/%lu", GraphImpl(),
(long)mPendingTracks.Length(), (long)mUpdateTracks.Length()));
if (GraphImpl()) {
GraphImpl()->EnsureNextIteration();
}
}
void SourceMediaStream::ResampleAudioToGraphSampleRate(TrackData* aTrackData,
MediaSegment* aSegment) {
if (aSegment->GetType() != MediaSegment::AUDIO ||
aTrackData->mInputRate == GraphImpl()->GraphRate()) {
return;
}
AudioSegment* segment = static_cast<AudioSegment*>(aSegment);
int channels = segment->ChannelCount();
// If this segment is just silence, we delay instanciating the resampler. We
// also need to recreate the resampler if the channel count changes.
if (channels && aTrackData->mResamplerChannelCount != channels) {
SpeexResamplerState* state = speex_resampler_init(
channels, aTrackData->mInputRate, GraphImpl()->GraphRate(),
SPEEX_RESAMPLER_QUALITY_MIN, nullptr);
if (!state) {
return;
}
aTrackData->mResampler.own(state);
aTrackData->mResamplerChannelCount = channels;
}
segment->ResampleChunks(aTrackData->mResampler, aTrackData->mInputRate,
GraphImpl()->GraphRate());
}
void SourceMediaStream::AdvanceTimeVaryingValuesToCurrentTime(
GraphTime aCurrentTime, GraphTime aBlockedTime) {
MutexAutoLock lock(mMutex);
mTracksStartTime += aBlockedTime;
mStreamTracksStartTimeStamp +=
TimeDuration::FromSeconds(GraphImpl()->MediaTimeToSeconds(aBlockedTime));
mTracks.ForgetUpTo(aCurrentTime - mTracksStartTime);
}
StreamTime SourceMediaStream::AppendToTrack(TrackID aID, MediaSegment* aSegment,
MediaSegment* aRawSegment) {
MutexAutoLock lock(mMutex);
// ::EndAllTrackAndFinished() can end these before the sources notice
StreamTime appended = 0;
auto graph = GraphImpl();
if (!mFinished && graph) {
TrackData* track = FindDataForTrack(aID);
if (track) {
// Data goes into mData, and on the next iteration of the MSG moves
// into the track's segment after NotifyQueuedTrackChanges(). This adds
// 0-10ms of delay before data gets to direct listeners.
// Indirect listeners (via subsequent TrackUnion nodes) are synced to
// playout time, and so can be delayed by buffering.
// Apply track disabling before notifying any consumers directly
// or inserting into the graph
ApplyTrackDisabling(aID, aSegment, aRawSegment);
ResampleAudioToGraphSampleRate(track, aSegment);
// Must notify first, since AppendFrom() will empty out aSegment
NotifyDirectConsumers(track, aRawSegment ? aRawSegment : aSegment);
appended = aSegment->GetDuration();
track->mData->AppendFrom(aSegment); // note: aSegment is now dead
GraphImpl()->EnsureNextIteration();
} else {
aSegment->Clear();
}
}
return appended;
}
void SourceMediaStream::NotifyDirectConsumers(TrackData* aTrack,
MediaSegment* aSegment) {
mMutex.AssertCurrentThreadOwns();
MOZ_ASSERT(aTrack);
for (const TrackBound<DirectMediaStreamTrackListener>& source :
mDirectTrackListeners) {
if (aTrack->mID != source.mTrackID) {
continue;
}
StreamTime offset = 0; // FIX! need a separate StreamTime.... or the end of
// the internal buffer
source.mListener->NotifyRealtimeTrackDataAndApplyTrackDisabling(
Graph(), offset, *aSegment);
}
}
void SourceMediaStream::AddDirectTrackListenerImpl(
already_AddRefed<DirectMediaStreamTrackListener> aListener,
TrackID aTrackID) {
MOZ_ASSERT(IsTrackIDExplicit(aTrackID));
MutexAutoLock lock(mMutex);
RefPtr<DirectMediaStreamTrackListener> listener = aListener;
LOG(LogLevel::Debug, ("%p: Adding direct track listener %p bound to track %d "
"to source stream %p",
GraphImpl(), listener.get(), aTrackID, this));
StreamTracks::Track* track = FindTrack(aTrackID);
if (!track) {
LOG(LogLevel::Warning,
("%p: Couldn't find source track for direct track listener %p",
GraphImpl(), listener.get()));
listener->NotifyDirectListenerInstalled(
DirectMediaStreamTrackListener::InstallationResult::
TRACK_NOT_FOUND_AT_SOURCE);
return;
}
bool isAudio = track->GetType() == MediaSegment::AUDIO;
bool isVideo = track->GetType() == MediaSegment::VIDEO;
if (!isAudio && !isVideo) {
LOG(LogLevel::Warning,
("%p: Source track for direct track listener %p is unknown",
GraphImpl(), listener.get()));
MOZ_ASSERT(false);
return;
}
for (auto entry : mDirectTrackListeners) {
if (entry.mListener == listener &&
(entry.mTrackID == TRACK_ANY || entry.mTrackID == aTrackID)) {
listener->NotifyDirectListenerInstalled(
DirectMediaStreamTrackListener::InstallationResult::ALREADY_EXISTS);
return;
}
}
TrackBound<DirectMediaStreamTrackListener>* sourceListener =
mDirectTrackListeners.AppendElement();
sourceListener->mListener = listener;
sourceListener->mTrackID = aTrackID;
LOG(LogLevel::Debug,
("%p: Added direct track listener %p", GraphImpl(), listener.get()));
listener->NotifyDirectListenerInstalled(
DirectMediaStreamTrackListener::InstallationResult::SUCCESS);
// Pass buffered data to the listener
AudioSegment bufferedAudio;
VideoSegment bufferedVideo;
MediaSegment& bufferedData = isAudio
? static_cast<MediaSegment&>(bufferedAudio)
: static_cast<MediaSegment&>(bufferedVideo);
MediaSegment& trackSegment = *track->GetSegment();
if (mTracks.GetForgottenDuration() < trackSegment.GetDuration()) {
bufferedData.AppendSlice(trackSegment, mTracks.GetForgottenDuration(),
trackSegment.GetDuration());
}
if (TrackData* updateData = FindDataForTrack(aTrackID)) {
bufferedData.AppendSlice(*updateData->mData, 0,
updateData->mData->GetDuration());
}
if (bufferedData.GetDuration() != 0) {
listener->NotifyRealtimeTrackData(Graph(), 0, bufferedData);
}
}
void SourceMediaStream::RemoveDirectTrackListenerImpl(
DirectMediaStreamTrackListener* aListener, TrackID aTrackID) {
MutexAutoLock lock(mMutex);
for (int32_t i = mDirectTrackListeners.Length() - 1; i >= 0; --i) {
const TrackBound<DirectMediaStreamTrackListener>& source =
mDirectTrackListeners[i];
if (source.mListener == aListener && source.mTrackID == aTrackID) {
aListener->NotifyDirectListenerUninstalled();
mDirectTrackListeners.RemoveElementAt(i);
}
}
}
StreamTime SourceMediaStream::GetEndOfAppendedData(TrackID aID) {
MutexAutoLock lock(mMutex);
TrackData* track = FindDataForTrack(aID);
if (track) {
return track->mEndOfFlushedData + track->mData->GetDuration();
}
MOZ_CRASH("Track not found");
return 0;
}
void SourceMediaStream::EndTrack(TrackID aID) {
MutexAutoLock lock(mMutex);
TrackData* track = FindDataForTrack(aID);
if (track) {
track->mCommands |= TrackEventCommand::TRACK_EVENT_ENDED;
}
if (auto graph = GraphImpl()) {
graph->EnsureNextIteration();
}
}
void SourceMediaStream::FinishPendingWithLockHeld() {
mMutex.AssertCurrentThreadOwns();
mFinishPending = true;
if (auto graph = GraphImpl()) {
graph->EnsureNextIteration();
}
}
void SourceMediaStream::SetTrackEnabledImpl(TrackID aTrackID,
DisabledTrackMode aMode) {
{
MutexAutoLock lock(mMutex);
for (TrackBound<DirectMediaStreamTrackListener>& l :
mDirectTrackListeners) {
if (l.mTrackID != aTrackID) {
continue;
}
DisabledTrackMode oldMode = GetDisabledTrackMode(aTrackID);
bool oldEnabled = oldMode == DisabledTrackMode::ENABLED;
if (!oldEnabled && aMode == DisabledTrackMode::ENABLED) {
LOG(LogLevel::Debug, ("%p: SourceMediaStream %p track %d setting "
"direct listener enabled",
GraphImpl(), this, aTrackID));
l.mListener->DecreaseDisabled(oldMode);
} else if (oldEnabled && aMode != DisabledTrackMode::ENABLED) {
LOG(LogLevel::Debug, ("%p: SourceMediaStream %p track %d setting "
"direct listener disabled",
GraphImpl(), this, aTrackID));
l.mListener->IncreaseDisabled(aMode);
}
}
}
MediaStream::SetTrackEnabledImpl(aTrackID, aMode);
}
void SourceMediaStream::EndAllTrackAndFinish() {
MutexAutoLock lock(mMutex);
for (uint32_t i = 0; i < mUpdateTracks.Length(); ++i) {
SourceMediaStream::TrackData* data = &mUpdateTracks[i];
data->mCommands |= TrackEventCommand::TRACK_EVENT_ENDED;
}
mPendingTracks.Clear();
FinishPendingWithLockHeld();
// we will call NotifyEvent() to let GetUserMedia know
}
void SourceMediaStream::RemoveAllDirectListenersImpl() {
GraphImpl()->AssertOnGraphThreadOrNotRunning();
auto directListeners(mDirectTrackListeners);
for (auto& l : directListeners) {
l.mListener->NotifyDirectListenerUninstalled();
}
mDirectTrackListeners.Clear();
}
SourceMediaStream::~SourceMediaStream() {}
bool SourceMediaStream::HasPendingAudioTrack() {
MutexAutoLock lock(mMutex);
bool audioTrackPresent = false;
for (auto& data : mPendingTracks) {
if (data.mData->GetType() == MediaSegment::AUDIO) {
audioTrackPresent = true;
break;
}
}
return audioTrackPresent;
}
void MediaInputPort::Init() {
LOG(LogLevel::Debug, ("%p: Adding MediaInputPort %p (from %p to %p)",
mSource->GraphImpl(), this, mSource, mDest));
mSource->AddConsumer(this);
mDest->AddInput(this);
// mPortCount decremented via MediaInputPort::Destroy's message
++mDest->GraphImpl()->mPortCount;
}
void MediaInputPort::Disconnect() {
GraphImpl()->AssertOnGraphThreadOrNotRunning();
NS_ASSERTION(!mSource == !mDest,
"mSource must either both be null or both non-null");
if (!mSource) return;
mSource->RemoveConsumer(this);
mDest->RemoveInput(this);
mSource = nullptr;
mDest = nullptr;
GraphImpl()->SetStreamOrderDirty();
}
MediaInputPort::InputInterval MediaInputPort::GetNextInputInterval(
GraphTime aTime) const {
InputInterval result = {GRAPH_TIME_MAX, GRAPH_TIME_MAX, false};
if (aTime >= mDest->mStartBlocking) {
return result;
}
result.mStart = aTime;
result.mEnd = mDest->mStartBlocking;
result.mInputIsBlocked = aTime >= mSource->mStartBlocking;
if (!result.mInputIsBlocked) {
result.mEnd = std::min(result.mEnd, mSource->mStartBlocking);
}
return result;
}
void MediaInputPort::Suspended() { mDest->InputSuspended(this); }
void MediaInputPort::Resumed() { mDest->InputResumed(this); }
void MediaInputPort::Destroy() {
class Message : public ControlMessage {
public:
explicit Message(MediaInputPort* aPort)
: ControlMessage(nullptr), mPort(aPort) {}
void Run() override {
mPort->Disconnect();
--mPort->GraphImpl()->mPortCount;
mPort->SetGraphImpl(nullptr);
NS_RELEASE(mPort);
}
void RunDuringShutdown() override { Run(); }
MediaInputPort* mPort;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this));
}
MediaStreamGraphImpl* MediaInputPort::GraphImpl() { return mGraph; }
MediaStreamGraph* MediaInputPort::Graph() { return mGraph; }
void MediaInputPort::SetGraphImpl(MediaStreamGraphImpl* aGraph) {
MOZ_ASSERT(!mGraph || !aGraph, "Should only be set once");
mGraph = aGraph;
}
void MediaInputPort::BlockSourceTrackIdImpl(TrackID aTrackId,
BlockingMode aBlockingMode) {
mBlockedTracks.AppendElement(
Pair<TrackID, BlockingMode>(aTrackId, aBlockingMode));
}
RefPtr<GenericPromise> MediaInputPort::BlockSourceTrackId(
TrackID aTrackId, BlockingMode aBlockingMode) {
class Message : public ControlMessage {
public:
Message(MediaInputPort* aPort, TrackID aTrackId, BlockingMode aBlockingMode,
already_AddRefed<nsIRunnable> aRunnable)
: ControlMessage(aPort->GetDestination()),
mPort(aPort),
mTrackId(aTrackId),
mBlockingMode(aBlockingMode),
mRunnable(aRunnable) {}
void Run() override {
mPort->BlockSourceTrackIdImpl(mTrackId, mBlockingMode);
if (mRunnable) {
mStream->Graph()->DispatchToMainThreadStableState(mRunnable.forget());
}
}
void RunDuringShutdown() override { Run(); }
RefPtr<MediaInputPort> mPort;
TrackID mTrackId;
BlockingMode mBlockingMode;
nsCOMPtr<nsIRunnable> mRunnable;
};
MOZ_ASSERT(IsTrackIDExplicit(aTrackId), "Only explicit TrackID is allowed");
MozPromiseHolder<GenericPromise> holder;
RefPtr<GenericPromise> p = holder.Ensure(__func__);
class HolderRunnable : public Runnable {
public:
explicit HolderRunnable(MozPromiseHolder<GenericPromise>&& aHolder)
: Runnable("MediaInputPort::HolderRunnable"),
mHolder(std::move(aHolder)) {}
NS_IMETHOD Run() override {
MOZ_ASSERT(NS_IsMainThread());
mHolder.Resolve(true, __func__);
return NS_OK;
}
private:
~HolderRunnable() {
mHolder.RejectIfExists(NS_ERROR_DOM_MEDIA_CANCELED, __func__);
}
MozPromiseHolder<GenericPromise> mHolder;
};
auto runnable = MakeRefPtr<HolderRunnable>(std::move(holder));
GraphImpl()->AppendMessage(
MakeUnique<Message>(this, aTrackId, aBlockingMode, runnable.forget()));
return p;
}
already_AddRefed<MediaInputPort> ProcessedMediaStream::AllocateInputPort(
MediaStream* aStream, TrackID aTrackID, TrackID aDestTrackID,
uint16_t aInputNumber, uint16_t aOutputNumber,
nsTArray<TrackID>* aBlockedTracks) {
// This method creates two references to the MediaInputPort: one for
// the main thread, and one for the MediaStreamGraph.
class Message : public ControlMessage {
public:
explicit Message(MediaInputPort* aPort)
: ControlMessage(aPort->GetDestination()), mPort(aPort) {}
void Run() override {
mPort->Init();
// The graph holds its reference implicitly
mPort->GraphImpl()->SetStreamOrderDirty();
Unused << mPort.forget();
}
void RunDuringShutdown() override { Run(); }
RefPtr<MediaInputPort> mPort;
};
MOZ_ASSERT(aStream->GraphImpl() == GraphImpl());
MOZ_ASSERT(aTrackID == TRACK_ANY || IsTrackIDExplicit(aTrackID),
"Only TRACK_ANY and explicit ID are allowed for source track");
MOZ_ASSERT(
aDestTrackID == TRACK_ANY || IsTrackIDExplicit(aDestTrackID),
"Only TRACK_ANY and explicit ID are allowed for destination track");
MOZ_ASSERT(
aTrackID != TRACK_ANY || aDestTrackID == TRACK_ANY,
"Generic MediaInputPort cannot produce a single destination track");
RefPtr<MediaInputPort> port = new MediaInputPort(
aStream, aTrackID, this, aDestTrackID, aInputNumber, aOutputNumber);
if (aBlockedTracks) {
for (TrackID trackID : *aBlockedTracks) {
port->BlockSourceTrackIdImpl(trackID, BlockingMode::CREATION);
}
}
port->SetGraphImpl(GraphImpl());
GraphImpl()->AppendMessage(MakeUnique<Message>(port));
return port.forget();
}
void ProcessedMediaStream::QueueSetAutofinish(bool aAutofinish) {
class Message : public ControlMessage {
public:
Message(ProcessedMediaStream* aStream, bool aAutofinish)
: ControlMessage(aStream), mAutofinish(aAutofinish) {}
void Run() override {
static_cast<ProcessedMediaStream*>(mStream)->SetAutofinishImpl(
mAutofinish);
}
bool mAutofinish;
};
GraphImpl()->AppendMessage(MakeUnique<Message>(this, aAutofinish));
}
void ProcessedMediaStream::DestroyImpl() {
for (int32_t i = mInputs.Length() - 1; i >= 0; --i) {
mInputs[i]->Disconnect();
}
for (int32_t i = mSuspendedInputs.Length() - 1; i >= 0; --i) {
mSuspendedInputs[i]->Disconnect();
}
MediaStream::DestroyImpl();
// The stream order is only important if there are connections, in which
// case MediaInputPort::Disconnect() called SetStreamOrderDirty().
// MediaStreamGraphImpl::RemoveStreamGraphThread() will also call
// SetStreamOrderDirty(), for other reasons.
}
MediaStreamGraphImpl::MediaStreamGraphImpl(GraphDriverType aDriverRequested,
GraphRunType aRunTypeRequested,
TrackRate aSampleRate,
AbstractThread* aMainThread)
: MediaStreamGraph(aSampleRate),
mGraphRunner(aRunTypeRequested == SINGLE_THREAD ? new GraphRunner(this)
: nullptr),
mFirstCycleBreaker(0)
// An offline graph is not initially processing.
,
mEndTime(aDriverRequested == OFFLINE_THREAD_DRIVER ? 0 : GRAPH_TIME_MAX),
mPortCount(0),
mInputDeviceID(nullptr),
mOutputDeviceID(nullptr),
mNeedAnotherIteration(false),
mGraphDriverAsleep(false),
mMonitor("MediaStreamGraphImpl"),
mLifecycleState(LIFECYCLE_THREAD_NOT_STARTED),
mForceShutDown(false),
mPostedRunInStableStateEvent(false),
mDetectedNotRunning(false),
mPostedRunInStableState(false),
mRealtime(aDriverRequested != OFFLINE_THREAD_DRIVER),
mStreamOrderDirty(false),
mAbstractMainThread(aMainThread),
mSelfRef(this),
mOutputChannels(std::min<uint32_t>(8, CubebUtils::MaxNumberOfChannels())),
mGlobalVolume(CubebUtils::GetVolumeScale())
#ifdef DEBUG
,
mCanRunMessagesSynchronously(false)
#endif
,
mMainThreadGraphTime(0, "MediaStreamGraphImpl::mMainThreadGraphTime") {
if (mRealtime) {
if (aDriverRequested == AUDIO_THREAD_DRIVER) {
// Always start with zero input channels.
mDriver = new AudioCallbackDriver(this, 0);
} else {
mDriver = new SystemClockDriver(this);
}
#ifdef TRACING
// This is a noop if the logger has not been enabled.
gMSGTraceLogger.Start();
gMSGTraceLogger.Log("[");
#endif
} else {
mDriver = new OfflineClockDriver(this, MEDIA_GRAPH_TARGET_PERIOD_MS);
}
mLastMainThreadUpdate = TimeStamp::Now();
RegisterWeakAsyncMemoryReporter(this);
}
AbstractThread* MediaStreamGraph::AbstractMainThread() {
MOZ_ASSERT(static_cast<MediaStreamGraphImpl*>(this)->mAbstractMainThread);
return static_cast<MediaStreamGraphImpl*>(this)->mAbstractMainThread;
}
#ifdef DEBUG
bool MediaStreamGraphImpl::RunByGraphDriver(GraphDriver* aDriver) {
return aDriver->OnThread() ||
(mGraphRunner && mGraphRunner->RunByGraphDriver(aDriver));
}
#endif
void MediaStreamGraphImpl::Destroy() {
// First unregister from memory reporting.
UnregisterWeakMemoryReporter(this);
// Clear the self reference which will destroy this instance if all
// associated GraphDrivers are destroyed.
mSelfRef = nullptr;
}
static uint32_t WindowToHash(nsPIDOMWindowInner* aWindow,
TrackRate aSampleRate) {
uint32_t hashkey = 0;
hashkey = AddToHash(hashkey, aWindow);
hashkey = AddToHash(hashkey, aSampleRate);
return hashkey;
}
MediaStreamGraph* MediaStreamGraph::GetInstanceIfExists(
nsPIDOMWindowInner* aWindow, TrackRate aSampleRate) {
MOZ_ASSERT(NS_IsMainThread(), "Main thread only");
TrackRate sampleRate =
aSampleRate ? aSampleRate : CubebUtils::PreferredSampleRate();
uint32_t hashkey = WindowToHash(aWindow, sampleRate);
MediaStreamGraphImpl* graph = nullptr;
gGraphs.Get(hashkey, &graph);
return graph;
}
MediaStreamGraph* MediaStreamGraph::GetInstance(
MediaStreamGraph::GraphDriverType aGraphDriverRequested,
nsPIDOMWindowInner* aWindow, TrackRate aSampleRate) {
MOZ_ASSERT(NS_IsMainThread(), "Main thread only");
TrackRate sampleRate =
aSampleRate ? aSampleRate : CubebUtils::PreferredSampleRate();
MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(
GetInstanceIfExists(aWindow, sampleRate));
if (!graph) {
if (!gMediaStreamGraphShutdownBlocker) {
class Blocker : public media::ShutdownBlocker {
public:
Blocker()
: media::ShutdownBlocker(NS_LITERAL_STRING(
"MediaStreamGraph shutdown: blocking on msg thread")) {}
NS_IMETHOD
BlockShutdown(nsIAsyncShutdownClient* aProfileBeforeChange) override {
// Distribute the global async shutdown blocker in a ticket. If there
// are zero graphs then shutdown is unblocked when we go out of scope.
auto ticket = MakeRefPtr<media::ShutdownTicket>(
gMediaStreamGraphShutdownBlocker.get());
gMediaStreamGraphShutdownBlocker = nullptr;
for (auto iter = gGraphs.Iter(); !iter.Done(); iter.Next()) {
iter.UserData()->ForceShutDown(ticket);
}
return NS_OK;
}
};
gMediaStreamGraphShutdownBlocker = new Blocker();
nsCOMPtr<nsIAsyncShutdownClient> barrier = media::GetShutdownBarrier();
nsresult rv = barrier->AddBlocker(
gMediaStreamGraphShutdownBlocker, NS_LITERAL_STRING(__FILE__),
__LINE__, NS_LITERAL_STRING("MediaStreamGraph shutdown"));
MOZ_RELEASE_ASSERT(NS_SUCCEEDED(rv));
}
AbstractThread* mainThread;
if (aWindow) {
mainThread =
aWindow->AsGlobal()->AbstractMainThreadFor(TaskCategory::Other);
} else {
// Uncommon case, only for some old configuration of webspeech.
mainThread = AbstractThread::MainThread();
}
GraphRunType runType = DIRECT_DRIVER;
if (aGraphDriverRequested != OFFLINE_THREAD_DRIVER &&
Preferences::GetBool("dom.audioworklet.enabled", false)) {
runType = SINGLE_THREAD;
}
graph = new MediaStreamGraphImpl(aGraphDriverRequested, runType, sampleRate,
mainThread);
uint32_t hashkey = WindowToHash(aWindow, sampleRate);
gGraphs.Put(hashkey, graph);
LOG(LogLevel::Debug,
("Starting up MediaStreamGraph %p for window %p", graph, aWindow));
}
return graph;
}
MediaStreamGraph* MediaStreamGraph::CreateNonRealtimeInstance(
TrackRate aSampleRate, nsPIDOMWindowInner* aWindow) {
MOZ_ASSERT(NS_IsMainThread(), "Main thread only");
MediaStreamGraphImpl* graph = new MediaStreamGraphImpl(
OFFLINE_THREAD_DRIVER, DIRECT_DRIVER, aSampleRate,
aWindow->AsGlobal()->AbstractMainThreadFor(TaskCategory::Other));
LOG(LogLevel::Debug, ("Starting up Offline MediaStreamGraph %p", graph));
return graph;
}
void MediaStreamGraph::DestroyNonRealtimeInstance(MediaStreamGraph* aGraph) {
MOZ_ASSERT(NS_IsMainThread(), "Main thread only");
MOZ_ASSERT(aGraph->IsNonRealtime(),
"Should not destroy the global graph here");
MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(aGraph);
graph->ForceShutDown(nullptr);
}
NS_IMPL_ISUPPORTS(MediaStreamGraphImpl, nsIMemoryReporter, nsITimerCallback,
nsINamed)
NS_IMETHODIMP
MediaStreamGraphImpl::CollectReports(nsIHandleReportCallback* aHandleReport,
nsISupports* aData, bool aAnonymize) {
MOZ_ASSERT(NS_IsMainThread());
{
MonitorAutoLock mon(mMonitor);
if (LifecycleStateRef() >= LIFECYCLE_WAITING_FOR_THREAD_SHUTDOWN) {
// Shutting down, nothing to report.
FinishCollectReports(aHandleReport, aData, nsTArray<AudioNodeSizes>());
return NS_OK;
}
}
class Message final : public ControlMessage {
public:
Message(MediaStreamGraphImpl* aGraph,
nsIHandleReportCallback* aHandleReport, nsISupports* aHandlerData)
: ControlMessage(nullptr),
mGraph(aGraph),
mHandleReport(aHandleReport),
mHandlerData(aHandlerData) {}
void Run() override {
mGraph->CollectSizesForMemoryReport(mHandleReport.forget(),
mHandlerData.forget());
}
void RunDuringShutdown() override {
// Run this message during shutdown too, so that endReports is called.
Run();
}
MediaStreamGraphImpl* mGraph;
// nsMemoryReporterManager keeps the callback and data alive only if it
// does not time out.
nsCOMPtr<nsIHandleReportCallback> mHandleReport;
nsCOMPtr<nsISupports> mHandlerData;
};
AppendMessage(MakeUnique<Message>(this, aHandleReport, aData));
return NS_OK;
}
void MediaStreamGraphImpl::CollectSizesForMemoryReport(
already_AddRefed<nsIHandleReportCallback> aHandleReport,
already_AddRefed<nsISupports> aHandlerData) {
class FinishCollectRunnable final : public Runnable {
public:
explicit FinishCollectRunnable(
already_AddRefed<nsIHandleReportCallback> aHandleReport,
already_AddRefed<nsISupports> aHandlerData)
: mozilla::Runnable("FinishCollectRunnable"),
mHandleReport(aHandleReport),
mHandlerData(aHandlerData) {}
NS_IMETHOD Run() override {
MediaStreamGraphImpl::FinishCollectReports(mHandleReport, mHandlerData,
std::move(mAudioStreamSizes));
return NS_OK;
}
nsTArray<AudioNodeSizes> mAudioStreamSizes;
private:
~FinishCollectRunnable() {}
// Avoiding nsCOMPtr because NSCAP_ASSERT_NO_QUERY_NEEDED in its
// constructor modifies the ref-count, which cannot be done off main
// thread.
RefPtr<nsIHandleReportCallback> mHandleReport;
RefPtr<nsISupports> mHandlerData;
};
RefPtr<FinishCollectRunnable> runnable = new FinishCollectRunnable(
std::move(aHandleReport), std::move(aHandlerData));
auto audioStreamSizes = &runnable->mAudioStreamSizes;
for (MediaStream* s : AllStreams()) {
AudioNodeStream* stream = s->AsAudioNodeStream();
if (stream) {
AudioNodeSizes* usage = audioStreamSizes->AppendElement();
stream->SizeOfAudioNodesIncludingThis(MallocSizeOf, *usage);
}
}
mAbstractMainThread->Dispatch(runnable.forget());
}
void MediaStreamGraphImpl::FinishCollectReports(
nsIHandleReportCallback* aHandleReport, nsISupports* aData,
const nsTArray<AudioNodeSizes>& aAudioStreamSizes) {
MOZ_ASSERT(NS_IsMainThread());
nsCOMPtr<nsIMemoryReporterManager> manager =
do_GetService("@mozilla.org/memory-reporter-manager;1");
if (!manager) return;
#define REPORT(_path, _amount, _desc) \
aHandleReport->Callback(EmptyCString(), _path, KIND_HEAP, UNITS_BYTES, \
_amount, NS_LITERAL_CSTRING(_desc), aData);
for (size_t i = 0; i < aAudioStreamSizes.Length(); i++) {
const AudioNodeSizes& usage = aAudioStreamSizes[i];
const char* const nodeType =
usage.mNodeType ? usage.mNodeType : "<unknown>";
nsPrintfCString enginePath("explicit/webaudio/audio-node/%s/engine-objects",
nodeType);
REPORT(enginePath, usage.mEngine,
"Memory used by AudioNode engine objects (Web Audio).");
nsPrintfCString streamPath("explicit/webaudio/audio-node/%s/stream-objects",
nodeType);
REPORT(streamPath, usage.mStream,
"Memory used by AudioNode stream objects (Web Audio).");
}
size_t hrtfLoaders = WebCore::HRTFDatabaseLoader::sizeOfLoaders(MallocSizeOf);
if (hrtfLoaders) {
REPORT(NS_LITERAL_CSTRING(
"explicit/webaudio/audio-node/PannerNode/hrtf-databases"),
hrtfLoaders, "Memory used by PannerNode databases (Web Audio).");
}
#undef REPORT
manager->EndReport();
}
SourceMediaStream* MediaStreamGraph::CreateSourceStream() {
SourceMediaStream* stream = new SourceMediaStream();
AddStream(stream);
return stream;
}
ProcessedMediaStream* MediaStreamGraph::CreateTrackUnionStream() {
TrackUnionStream* stream = new TrackUnionStream();
AddStream(stream);
return stream;
}
ProcessedMediaStream* MediaStreamGraph::CreateAudioCaptureStream(
TrackID aTrackId) {
AudioCaptureStream* stream = new AudioCaptureStream(aTrackId);
AddStream(stream);
return stream;
}
void MediaStreamGraph::AddStream(MediaStream* aStream) {
NS_ADDREF(aStream);
MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(this);
aStream->SetGraphImpl(graph);
graph->AppendMessage(MakeUnique<CreateMessage>(aStream));
}
class GraphStartedRunnable final : public Runnable {
public:
GraphStartedRunnable(AudioNodeStream* aStream, MediaStreamGraph* aGraph)
: Runnable("GraphStartedRunnable"), mStream(aStream), mGraph(aGraph) {}
NS_IMETHOD Run() override {
mGraph->NotifyWhenGraphStarted(mStream);
return NS_OK;
}
private:
RefPtr<AudioNodeStream> mStream;
MediaStreamGraph* mGraph;
};
void MediaStreamGraph::NotifyWhenGraphStarted(AudioNodeStream* aStream) {
MOZ_ASSERT(NS_IsMainThread());
class GraphStartedNotificationControlMessage : public ControlMessage {
public:
explicit GraphStartedNotificationControlMessage(AudioNodeStream* aStream)
: ControlMessage(aStream) {}
void Run() override {
// This runs on the graph thread, so when this runs, and the current
// driver is an AudioCallbackDriver, we know the audio hardware is
// started. If not, we are going to switch soon, keep reposting this
// ControlMessage.
MediaStreamGraphImpl* graphImpl = mStream->GraphImpl();
if (graphImpl->CurrentDriver()->AsAudioCallbackDriver()) {
nsCOMPtr<nsIRunnable> event = new dom::StateChangeTask(
mStream->AsAudioNodeStream(), nullptr, AudioContextState::Running);
graphImpl->Dispatch(event.forget());
} else {
nsCOMPtr<nsIRunnable> event = new GraphStartedRunnable(
mStream->AsAudioNodeStream(), mStream->Graph());
graphImpl->Dispatch(event.forget());
}
}
void RunDuringShutdown() override {}
};
if (!aStream->IsDestroyed()) {
MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this);
graphImpl->AppendMessage(
MakeUnique<GraphStartedNotificationControlMessage>(aStream));
}
}
void MediaStreamGraphImpl::IncrementSuspendCount(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
if (!aStream->IsSuspended()) {
MOZ_ASSERT(mStreams.Contains(aStream));
mStreams.RemoveElement(aStream);
mSuspendedStreams.AppendElement(aStream);
SetStreamOrderDirty();
}
aStream->IncrementSuspendCount();
}
void MediaStreamGraphImpl::DecrementSuspendCount(MediaStream* aStream) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
bool wasSuspended = aStream->IsSuspended();
aStream->DecrementSuspendCount();
if (wasSuspended && !aStream->IsSuspended()) {
MOZ_ASSERT(mSuspendedStreams.Contains(aStream));
mSuspendedStreams.RemoveElement(aStream);
mStreams.AppendElement(aStream);
ProcessedMediaStream* ps = aStream->AsProcessedStream();
if (ps) {
ps->mCycleMarker = NOT_VISITED;
}
SetStreamOrderDirty();
}
}
void MediaStreamGraphImpl::SuspendOrResumeStreams(
AudioContextOperation aAudioContextOperation,
const nsTArray<MediaStream*>& aStreamSet) {
MOZ_ASSERT(OnGraphThreadOrNotRunning());
// For our purpose, Suspend and Close are equivalent: we want to remove the
// streams from the set of streams that are going to be processed.
for (MediaStream* stream : aStreamSet) {
if (aAudioContextOperation == AudioContextOperation::Resume) {
DecrementSuspendCount(stream);
} else {
IncrementSuspendCount(stream);
}
}
LOG(LogLevel::Debug, ("Moving streams between suspended and running"
"state: mStreams: %zu, mSuspendedStreams: %zu",
mStreams.Length(), mSuspendedStreams.Length()));
#ifdef DEBUG
// The intersection of the two arrays should be null.
for (uint32_t i = 0; i < mStreams.Length(); i++) {
for (uint32_t j = 0; j < mSuspendedStreams.Length(); j++) {
MOZ_ASSERT(
mStreams[i] != mSuspendedStreams[j],
"The suspended stream set and running stream set are not disjoint.");
}
}
#endif
}
void MediaStreamGraphImpl::AudioContextOperationCompleted(
MediaStream* aStream, void* aPromise, AudioContextOperation aOperation) {
// This can be called from the thread created to do cubeb operation, or the
// MSG thread. The pointers passed back here are refcounted, so are still
// alive.
AudioContextState state;
switch (aOperation) {
case AudioContextOperation::Suspend:
state = AudioContextState::Suspended;
break;
case AudioContextOperation::Resume:
state = AudioContextState::Running;
break;
case AudioContextOperation::Close:
state = AudioContextState::Closed;
break;
default:
MOZ_CRASH("Not handled.");
}
nsCOMPtr<nsIRunnable> event =
new dom::StateChangeTask(aStream->AsAudioNodeStream(), aPromise, state);
mAbstractMainThread->Dispatch(event.forget());
}
void MediaStreamGraphImpl::ApplyAudioContextOperationImpl(
MediaStream* aDestinationStream, const nsTArray<MediaStream*>& aStreams,
AudioContextOperation aOperation, void* aPromise) {
MOZ_ASSERT(OnGraphThread());
SuspendOrResumeStreams(aOperation, aStreams);
bool switching = false;
GraphDriver* nextDriver = nullptr;
{
MonitorAutoLock lock(mMonitor);
switching = CurrentDriver()->Switching();
if (switching) {
nextDriver = CurrentDriver()->NextDriver();
}
}
// If we have suspended the last AudioContext, and we don't have other
// streams that have audio, this graph will automatically switch to a
// SystemCallbackDriver, because it can't find a MediaStream that has an audio
// track. When resuming, force switching to an AudioCallbackDriver (if we're
// not already switching). It would have happened at the next iteration
// anyways, but doing this now save some time.
if (aOperation == AudioContextOperation::Resume) {
if (!CurrentDriver()->AsAudioCallbackDriver()) {
AudioCallbackDriver* driver;
if (switching) {
MOZ_ASSERT(nextDriver->AsAudioCallbackDriver());
driver = nextDriver->AsAudioCallbackDriver();
} else {
driver = new AudioCallbackDriver(this, AudioInputChannelCount());
MonitorAutoLock lock(mMonitor);
CurrentDriver()->SwitchAtNextIteration(driver);
}
driver->EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise,
aOperation);
} else {
// We are resuming a context, but we are already using an
// AudioCallbackDriver, we can resolve the promise now.
AudioContextOperationCompleted(aDestinationStream, aPromise, aOperation);
}
}
// Close, suspend: check if we are going to switch to a
// SystemAudioCallbackDriver, and pass the promise to the AudioCallbackDriver
// if that's the case, so it can notify the content.
// This is the same logic as in UpdateStreamOrder, but it's simpler to have it
// here as well so we don't have to store the Promise(s) on the Graph.
if (aOperation != AudioContextOperation::Resume) {
bool audioTrackPresent = AudioTrackPresent();
if (!audioTrackPresent && CurrentDriver()->AsAudioCallbackDriver()) {
CurrentDriver()
->AsAudioCallbackDriver()
->EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise,
aOperation);
SystemClockDriver* driver;
if (nextDriver) {
MOZ_ASSERT(!nextDriver->AsAudioCallbackDriver());
} else {
driver = new SystemClockDriver(this);
MonitorAutoLock lock(mMonitor);
CurrentDriver()->SwitchAtNextIteration(driver);
}
// We are closing or suspending an AudioContext, but we just got resumed.
// Queue the operation on the next driver so that the ordering is
// preserved.
} else if (!audioTrackPresent && switching) {
MOZ_ASSERT(nextDriver->AsAudioCallbackDriver() ||
nextDriver->AsSystemClockDriver()->IsFallback());
if (nextDriver->AsAudioCallbackDriver()) {
nextDriver->AsAudioCallbackDriver()
->EnqueueStreamAndPromiseForOperation(aDestinationStream, aPromise,
aOperation);
} else {
// If this is not an AudioCallbackDriver, this means we failed opening
// an AudioCallbackDriver in the past, and we're constantly trying to
// re-open an new audio stream, but are running this graph that has an
// audio track off a SystemClockDriver for now to keep things moving.
// This is the case where we're trying to switch an an system driver
// (because suspend or close have been called on an AudioContext, or
// we've closed the page), but we're already running one. We can just
// resolve the promise now: we're already running off a system thread.
AudioContextOperationCompleted(aDestinationStream, aPromise,
aOperation);
}
} else {
// We are closing or suspending an AudioContext, but something else is
// using the audio stream, we can resolve the promise now.
AudioContextOperationCompleted(aDestinationStream, aPromise, aOperation);
}
}
}
void MediaStreamGraph::ApplyAudioContextOperation(
MediaStream* aDestinationStream, const nsTArray<MediaStream*>& aStreams,
AudioContextOperation aOperation, void* aPromise) {
class AudioContextOperationControlMessage : public ControlMessage {
public:
AudioContextOperationControlMessage(MediaStream* aDestinationStream,
const nsTArray<MediaStream*>& aStreams,
AudioContextOperation aOperation,
void* aPromise)
: ControlMessage(aDestinationStream),
mStreams(aStreams),
mAudioContextOperation(aOperation),
mPromise(aPromise) {}
void Run() override {
mStream->GraphImpl()->ApplyAudioContextOperationImpl(
mStream, mStreams, mAudioContextOperation, mPromise);
}
void RunDuringShutdown() override {
MOZ_ASSERT(mAudioContextOperation == AudioContextOperation::Close,
"We should be reviving the graph?");
}
private:
// We don't need strong references here for the same reason ControlMessage
// doesn't.
nsTArray<MediaStream*> mStreams;
AudioContextOperation mAudioContextOperation;
void* mPromise;
};
MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this);
graphImpl->AppendMessage(MakeUnique<AudioContextOperationControlMessage>(
aDestinationStream, aStreams, aOperation, aPromise));
}
bool MediaStreamGraph::IsNonRealtime() const {
return !static_cast<const MediaStreamGraphImpl*>(this)->mRealtime;
}
void MediaStreamGraph::StartNonRealtimeProcessing(uint32_t aTicksToProcess) {
MOZ_ASSERT(NS_IsMainThread(), "main thread only");
MediaStreamGraphImpl* graph = static_cast<MediaStreamGraphImpl*>(this);
NS_ASSERTION(!graph->mRealtime, "non-realtime only");
class Message : public ControlMessage {
public:
explicit Message(MediaStreamGraphImpl* aGraph, uint32_t aTicksToProcess)
: ControlMessage(nullptr),
mGraph(aGraph),
mTicksToProcess(aTicksToProcess) {}
void Run() override {
MOZ_ASSERT(mGraph->mEndTime == 0,
"StartNonRealtimeProcessing should be called only once");
mGraph->mEndTime = mGraph->RoundUpToEndOfAudioBlock(
mGraph->mStateComputedTime + mTicksToProcess);
}
// The graph owns this message.
MediaStreamGraphImpl* MOZ_NON_OWNING_REF mGraph;
uint32_t mTicksToProcess;
};
graph->AppendMessage(MakeUnique<Message>(graph, aTicksToProcess));
}
void ProcessedMediaStream::AddInput(MediaInputPort* aPort) {
MediaStream* s = aPort->GetSource();
if (!s->IsSuspended()) {
mInputs.AppendElement(aPort);
} else {
mSuspendedInputs.AppendElement(aPort);
}
GraphImpl()->SetStreamOrderDirty();
}
void ProcessedMediaStream::InputSuspended(MediaInputPort* aPort) {
GraphImpl()->AssertOnGraphThreadOrNotRunning();
mInputs.RemoveElement(aPort);
mSuspendedInputs.AppendElement(aPort);
GraphImpl()->SetStreamOrderDirty();
}
void ProcessedMediaStream::InputResumed(MediaInputPort* aPort) {
GraphImpl()->AssertOnGraphThreadOrNotRunning();
mSuspendedInputs.RemoveElement(aPort);
mInputs.AppendElement(aPort);
GraphImpl()->SetStreamOrderDirty();
}
void MediaStreamGraph::RegisterCaptureStreamForWindow(
uint64_t aWindowId, ProcessedMediaStream* aCaptureStream) {
MOZ_ASSERT(NS_IsMainThread());
MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this);
graphImpl->RegisterCaptureStreamForWindow(aWindowId, aCaptureStream);
}
void MediaStreamGraphImpl::RegisterCaptureStreamForWindow(
uint64_t aWindowId, ProcessedMediaStream* aCaptureStream) {
MOZ_ASSERT(NS_IsMainThread());
WindowAndStream winAndStream;
winAndStream.mWindowId = aWindowId;
winAndStream.mCaptureStreamSink = aCaptureStream;
mWindowCaptureStreams.AppendElement(winAndStream);
}
void MediaStreamGraph::UnregisterCaptureStreamForWindow(uint64_t aWindowId) {
MOZ_ASSERT(NS_IsMainThread());
MediaStreamGraphImpl* graphImpl = static_cast<MediaStreamGraphImpl*>(this);
graphImpl->UnregisterCaptureStreamForWindow(aWindowId);
}
void MediaStreamGraphImpl::UnregisterCaptureStreamForWindow(
uint64_t aWindowId) {
MOZ_ASSERT(NS_IsMainThread());
for (int32_t i = mWindowCaptureStreams.Length() - 1; i >= 0; i--) {
if (mWindowCaptureStreams[i].mWindowId == aWindowId) {
mWindowCaptureStreams.RemoveElementAt(i);
}
}
}
already_AddRefed<MediaInputPort> MediaStreamGraph::ConnectToCaptureStream(
uint64_t aWindowId, MediaStream* aMediaStream) {
return aMediaStream->GraphImpl()->ConnectToCaptureStream(aWindowId,
aMediaStream);
}
already_AddRefed<MediaInputPort> MediaStreamGraphImpl::ConnectToCaptureStream(
uint64_t aWindowId, MediaStream* aMediaStream) {
MOZ_ASSERT(NS_IsMainThread());
for (uint32_t i = 0; i < mWindowCaptureStreams.Length(); i++) {
if (mWindowCaptureStreams[i].mWindowId == aWindowId) {
ProcessedMediaStream* sink = mWindowCaptureStreams[i].mCaptureStreamSink;
return sink->AllocateInputPort(aMediaStream);
}
}
return nullptr;
}
void MediaStreamGraph::DispatchToMainThreadStableState(
already_AddRefed<nsIRunnable> aRunnable) {
AssertOnGraphThreadOrNotRunning();
static_cast<MediaStreamGraphImpl*>(this)
->mPendingUpdateRunnables.AppendElement(std::move(aRunnable));
}
Watchable<mozilla::GraphTime>& MediaStreamGraphImpl::CurrentTime() {
MOZ_ASSERT(NS_IsMainThread());
return mMainThreadGraphTime;
}
} // namespace mozilla