Bug 1404997 - P13. Move ExtractPendingInput logic to SourceMediaStream. r=padenot

The MSG shouldn't have to know about the inner details of the SourceMediaStream

MozReview-Commit-ID: 2S81SPzy09E

--HG--
extra : rebase_source : dff8384b19442e7686cef42420372e39f10094b6
This commit is contained in:
Jean-Yves Avenard 2017-12-06 16:55:56 +01:00
Родитель d9f13cf692
Коммит 55065b688d
2 изменённых файлов: 172 добавлений и 140 удалений

Просмотреть файл

@ -178,147 +178,8 @@ MediaStreamGraphImpl::ExtractPendingInput(SourceMediaStream* aStream,
bool* aEnsureNextIteration)
{
MOZ_ASSERT(OnGraphThread());
bool finished;
{
MutexAutoLock lock(aStream->mMutex);
if (aStream->mPullEnabled && !aStream->mFinished &&
!aStream->mListeners.IsEmpty()) {
// Compute how much stream time we'll need assuming we don't block
// the stream at all.
StreamTime t = aStream->GraphTimeToStreamTime(aDesiredUpToTime);
StreamTime current = aStream->mTracks.GetEnd();
LOG(LogLevel::Verbose,
("Calling NotifyPull aStream=%p t=%f current end=%f",
aStream,
MediaTimeToSeconds(t),
MediaTimeToSeconds(current)));
if (t > current) {
*aEnsureNextIteration = true;
#ifdef DEBUG
if (aStream->mListeners.Length() == 0) {
LOG(
LogLevel::Error,
("No listeners in NotifyPull aStream=%p desired=%f current end=%f",
aStream,
MediaTimeToSeconds(t),
MediaTimeToSeconds(current)));
aStream->DumpTrackInfo();
}
#endif
for (uint32_t j = 0; j < aStream->mListeners.Length(); ++j) {
MediaStreamListener* l = aStream->mListeners[j];
{
MutexAutoUnlock unlock(aStream->mMutex);
l->NotifyPull(this, t);
}
}
}
}
finished = aStream->mUpdateFinished;
bool shouldNotifyTrackCreated = false;
for (int32_t i = aStream->mUpdateTracks.Length() - 1; i >= 0; --i) {
SourceMediaStream::TrackData* data = &aStream->mUpdateTracks[i];
aStream->ApplyTrackDisabling(data->mID, data->mData);
// Dealing with NotifyQueuedTrackChanges and NotifyQueuedAudioData part.
// The logic is different from the manipulating of aStream->mTracks part.
// So it is not combined with the manipulating of aStream->mTracks part.
StreamTime offset =
(data->mCommands & SourceMediaStream::TRACK_CREATE)
? data->mStart
: aStream->mTracks.FindTrack(data->mID)->GetSegment()->GetDuration();
// Audio case.
if (data->mData->GetType() == MediaSegment::AUDIO) {
if (data->mCommands) {
MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED));
for (MediaStreamListener* l : aStream->mListeners) {
if (data->mCommands & SourceMediaStream::TRACK_END) {
l->NotifyQueuedAudioData(this, data->mID,
offset, *(static_cast<AudioSegment*>(data->mData.get())));
}
l->NotifyQueuedTrackChanges(this, data->mID,
offset, static_cast<TrackEventCommand>(data->mCommands), *data->mData);
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
l->NotifyQueuedAudioData(this, data->mID,
offset, *(static_cast<AudioSegment*>(data->mData.get())));
}
}
} else {
for (MediaStreamListener* l : aStream->mListeners) {
l->NotifyQueuedAudioData(this, data->mID,
offset, *(static_cast<AudioSegment*>(data->mData.get())));
}
}
}
// Video case.
if (data->mData->GetType() == MediaSegment::VIDEO) {
if (data->mCommands) {
MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED));
for (MediaStreamListener* l : aStream->mListeners) {
l->NotifyQueuedTrackChanges(this, data->mID,
offset, static_cast<TrackEventCommand>(data->mCommands), *data->mData);
}
}
}
for (TrackBound<MediaStreamTrackListener>& b : aStream->mTrackListeners) {
if (b.mTrackID != data->mID) {
continue;
}
b.mListener->NotifyQueuedChanges(this, offset, *data->mData);
if (data->mCommands & SourceMediaStream::TRACK_END) {
b.mListener->NotifyEnded();
}
}
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
MediaSegment* segment = data->mData.forget();
LOG(LogLevel::Debug,
("SourceMediaStream %p creating track %d, start %" PRId64
", initial end %" PRId64,
aStream,
data->mID,
int64_t(data->mStart),
int64_t(segment->GetDuration())));
data->mEndOfFlushedData += segment->GetDuration();
aStream->mTracks.AddTrack(data->mID, data->mStart, segment);
// The track has taken ownership of data->mData, so let's replace
// data->mData with an empty clone.
data->mData = segment->CreateEmptyClone();
data->mCommands &= ~SourceMediaStream::TRACK_CREATE;
shouldNotifyTrackCreated = true;
} else if (data->mData->GetDuration() > 0) {
MediaSegment* dest = aStream->mTracks.FindTrack(data->mID)->GetSegment();
LOG(LogLevel::Verbose,
("SourceMediaStream %p track %d, advancing end from %" PRId64
" to %" PRId64,
aStream,
data->mID,
int64_t(dest->GetDuration()),
int64_t(dest->GetDuration() + data->mData->GetDuration())));
data->mEndOfFlushedData += data->mData->GetDuration();
dest->AppendFrom(data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_END) {
aStream->mTracks.FindTrack(data->mID)->SetEnded();
aStream->mUpdateTracks.RemoveElementAt(i);
}
}
if (shouldNotifyTrackCreated) {
for (MediaStreamListener* l : aStream->mListeners) {
l->NotifyFinishedTrackCreation(this);
}
}
if (!aStream->mFinished) {
aStream->mTracks.AdvanceKnownTracksTime(aStream->mUpdateKnownTracksTime);
}
}
if (aStream->mTracks.GetEnd() > 0) {
aStream->mHasCurrentData = true;
}
if (finished) {
if (aStream->ExtractPendingInput(aDesiredUpToTime, aEnsureNextIteration)) {
FinishStream(aStream);
}
}
@ -2849,6 +2710,170 @@ SourceMediaStream::SetPullEnabled(bool aEnabled)
}
}
bool
SourceMediaStream::ExtractPendingInput(StreamTime aDesiredUpToTime,
bool* aEnsureNextIteration)
{
MutexAutoLock lock(mMutex);
if (mPullEnabled && !mFinished && !mListeners.IsEmpty()) {
// Compute how much stream time we'll need assuming we don't block
// the stream at all.
StreamTime t = GraphTimeToStreamTime(aDesiredUpToTime);
StreamTime current = mTracks.GetEnd();
LOG(LogLevel::Verbose,
("Calling NotifyPull aStream=%p t=%f current end=%f",
this,
GraphImpl()->MediaTimeToSeconds(t),
GraphImpl()->MediaTimeToSeconds(current)));
if (t > current) {
*aEnsureNextIteration = true;
#ifdef DEBUG
if (mListeners.Length() == 0) {
LOG(
LogLevel::Error,
("No listeners in NotifyPull aStream=%p desired=%f current end=%f",
this,
GraphImpl()->MediaTimeToSeconds(t),
GraphImpl()->MediaTimeToSeconds(current)));
DumpTrackInfo();
}
#endif
for (uint32_t j = 0; j < mListeners.Length(); ++j) {
MediaStreamListener* l = mListeners[j];
{
MutexAutoUnlock unlock(mMutex);
l->NotifyPull(GraphImpl(), t);
}
}
}
}
bool finished = mUpdateFinished;
bool shouldNotifyTrackCreated = false;
for (int32_t i = mUpdateTracks.Length() - 1; i >= 0; --i) {
SourceMediaStream::TrackData* data = &mUpdateTracks[i];
ApplyTrackDisabling(data->mID, data->mData);
// Dealing with NotifyQueuedTrackChanges and NotifyQueuedAudioData part.
// The logic is different from the manipulating of aStream->mTracks part.
// So it is not combined with the manipulating of aStream->mTracks part.
StreamTime offset =
(data->mCommands & SourceMediaStream::TRACK_CREATE)
? data->mStart
: mTracks.FindTrack(data->mID)->GetSegment()->GetDuration();
// Audio case.
if (data->mData->GetType() == MediaSegment::AUDIO) {
if (data->mCommands) {
MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED));
for (MediaStreamListener* l : mListeners) {
if (data->mCommands & SourceMediaStream::TRACK_END) {
l->NotifyQueuedAudioData(
GraphImpl(),
data->mID,
offset,
*(static_cast<AudioSegment*>(data->mData.get())));
}
l->NotifyQueuedTrackChanges(
GraphImpl(),
data->mID,
offset,
static_cast<TrackEventCommand>(data->mCommands),
*data->mData);
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
l->NotifyQueuedAudioData(
GraphImpl(),
data->mID,
offset,
*(static_cast<AudioSegment*>(data->mData.get())));
}
}
} else {
for (MediaStreamListener* l : mListeners) {
l->NotifyQueuedAudioData(
GraphImpl(),
data->mID,
offset,
*(static_cast<AudioSegment*>(data->mData.get())));
}
}
}
// Video case.
if (data->mData->GetType() == MediaSegment::VIDEO) {
if (data->mCommands) {
MOZ_ASSERT(!(data->mCommands & SourceMediaStream::TRACK_UNUSED));
for (MediaStreamListener* l : mListeners) {
l->NotifyQueuedTrackChanges(
GraphImpl(),
data->mID,
offset,
static_cast<TrackEventCommand>(data->mCommands),
*data->mData);
}
}
}
for (TrackBound<MediaStreamTrackListener>& b : mTrackListeners) {
if (b.mTrackID != data->mID) {
continue;
}
b.mListener->NotifyQueuedChanges(GraphImpl(), offset, *data->mData);
if (data->mCommands & SourceMediaStream::TRACK_END) {
b.mListener->NotifyEnded();
}
}
if (data->mCommands & SourceMediaStream::TRACK_CREATE) {
MediaSegment* segment = data->mData.forget();
LOG(LogLevel::Debug,
("SourceMediaStream %p creating track %d, start %" PRId64
", initial end %" PRId64,
this,
data->mID,
int64_t(data->mStart),
int64_t(segment->GetDuration())));
data->mEndOfFlushedData += segment->GetDuration();
mTracks.AddTrack(data->mID, data->mStart, segment);
// The track has taken ownership of data->mData, so let's replace
// data->mData with an empty clone.
data->mData = segment->CreateEmptyClone();
data->mCommands &= ~SourceMediaStream::TRACK_CREATE;
shouldNotifyTrackCreated = true;
} else if (data->mData->GetDuration() > 0) {
MediaSegment* dest = mTracks.FindTrack(data->mID)->GetSegment();
LOG(LogLevel::Verbose,
("SourceMediaStream %p track %d, advancing end from %" PRId64
" to %" PRId64,
this,
data->mID,
int64_t(dest->GetDuration()),
int64_t(dest->GetDuration() + data->mData->GetDuration())));
data->mEndOfFlushedData += data->mData->GetDuration();
dest->AppendFrom(data->mData);
}
if (data->mCommands & SourceMediaStream::TRACK_END) {
mTracks.FindTrack(data->mID)->SetEnded();
mUpdateTracks.RemoveElementAt(i);
}
}
if (shouldNotifyTrackCreated) {
for (MediaStreamListener* l : mListeners) {
l->NotifyFinishedTrackCreation(GraphImpl());
}
}
if (!mFinished) {
mTracks.AdvanceKnownTracksTime(mUpdateKnownTracksTime);
}
if (mTracks.GetEnd() > 0) {
mHasCurrentData = true;
}
return finished;
}
void
SourceMediaStream::AddTrackInternal(TrackID aID, TrackRate aRate, StreamTime aStart,
MediaSegment* aSegment, uint32_t aFlags)

Просмотреть файл

@ -697,6 +697,13 @@ public:
*/
void SetPullEnabled(bool aEnabled);
/**
* Extract any state updates pending in the stream, and apply them.
* Returns true if the stream is now finished.
*/
bool ExtractPendingInput(StreamTime aDesiredUpToTime,
bool* aEnsureNextIteration);
/**
* These add/remove DirectListeners, which allow bypassing the graph and any
* synchronization delays for e.g. PeerConnection, which wants the data ASAP