diff --git a/third_party/libwebrtc/README.moz-ff-commit b/third_party/libwebrtc/README.moz-ff-commit index 15725584cbd9..85a27d87cb0e 100644 --- a/third_party/libwebrtc/README.moz-ff-commit +++ b/third_party/libwebrtc/README.moz-ff-commit @@ -5967,3 +5967,6 @@ a1b8201009 # MOZ_LIBWEBRTC_SRC=/home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src MOZ_LIBWEBRTC_COMMIT=mjfdev bash dom/media/webrtc/third_party_build/fast-forward-libwebrtc.sh # base of lastest vendoring 791adafa09 +# MOZ_LIBWEBRTC_SRC=/home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src MOZ_LIBWEBRTC_COMMIT=mjfdev bash dom/media/webrtc/third_party_build/fast-forward-libwebrtc.sh +# base of lastest vendoring +bd9031bf22 diff --git a/third_party/libwebrtc/README.mozilla b/third_party/libwebrtc/README.mozilla index e0a12b9dd86f..beb4c1a77381 100644 --- a/third_party/libwebrtc/README.mozilla +++ b/third_party/libwebrtc/README.mozilla @@ -3980,3 +3980,5 @@ libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwe libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:37:39.311600. # python3 vendor-libwebrtc.py --from-local /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src --commit mjfdev libwebrtc libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:40:03.276020. +# python3 vendor-libwebrtc.py --from-local /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src --commit mjfdev libwebrtc +libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:40:43.817898. diff --git a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc index b8d07c7600c7..42d09f499f6e 100644 --- a/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc +++ b/third_party/libwebrtc/net/dcsctp/socket/dcsctp_socket.cc @@ -167,9 +167,12 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix, TimerOptions(options.t2_shutdown_timeout, TimerBackoffAlgorithm::kExponential, options.max_retransmissions))), - send_queue_(log_prefix_, - options_.max_send_buffer_size, - [](StreamID stream_id) {}) {} + send_queue_( + log_prefix_, + options_.max_send_buffer_size, + [](StreamID stream_id) {}, + /*total_buffered_amount_low_threshold=*/0, + []() {}) {} std::string DcSctpSocket::log_prefix() const { return log_prefix_ + "[" + std::string(ToString(state_)) + "] "; diff --git a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h index 73f1bd0314db..0cf64583ae88 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/mock_send_queue.h @@ -44,6 +44,7 @@ class MockSendQueue : public SendQueue { MOCK_METHOD(void, RollbackResetStreams, (), (override)); MOCK_METHOD(void, Reset, (), (override)); MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override)); + MOCK_METHOD(size_t, total_buffered_amount, (), (const, override)); MOCK_METHOD(size_t, buffered_amount_low_threshold, (StreamID stream_id), diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc index 77bb3168b40a..027e5b8271a2 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.cc @@ -39,6 +39,7 @@ RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) { *item.expires_at <= now) { // TODO(boivie): This should be reported to the client. buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); continue; } @@ -50,6 +51,14 @@ RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) { return nullptr; } +bool RRSendQueue::IsConsistent() const { + size_t total_buffered_amount = 0; + for (const auto& stream_entry : streams_) { + total_buffered_amount += stream_entry.second.buffered_amount().value(); + } + return total_buffered_amount == total_buffered_amount_.value(); +} + bool RRSendQueue::OutgoingStream::IsConsistent() const { size_t bytes = 0; for (const auto& item : items_) { @@ -80,6 +89,7 @@ void RRSendQueue::OutgoingStream::Add(DcSctpMessage message, absl::optional expires_at, const SendOptions& send_options) { buffered_amount_.Increase(message.payload().size()); + total_buffered_amount_.Increase(message.payload().size()); items_.emplace_back(std::move(message), expires_at, send_options); RTC_DCHECK(IsConsistent()); @@ -141,6 +151,7 @@ absl::optional RRSendQueue::OutgoingStream::Produce( FSN fsn(item->current_fsn); item->current_fsn = FSN(*item->current_fsn + 1); buffered_amount_.Decrease(payload.size()); + total_buffered_amount_.Decrease(payload.size()); SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)), item->message_id.value(), fsn, ppid, @@ -172,6 +183,7 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, if (item.send_options.unordered == unordered && item.message_id.has_value() && *item.message_id == message_id) { buffered_amount_.Decrease(item.remaining_size); + total_buffered_amount_.Decrease(item.remaining_size); items_.pop_front(); // As the item still existed, it had unsent data. result = true; @@ -193,6 +205,7 @@ void RRSendQueue::OutgoingStream::Pause() { for (auto it = items_.begin(); it != items_.end();) { if (it->remaining_offset == 0) { buffered_amount_.Decrease(it->remaining_size); + total_buffered_amount_.Decrease(it->remaining_size); it = items_.erase(it); } else { ++it; @@ -208,6 +221,8 @@ void RRSendQueue::OutgoingStream::Reset() { auto& item = items_.front(); buffered_amount_.Increase(item.message.payload().size() - item.remaining_size); + total_buffered_amount_.Increase(item.message.payload().size() - + item.remaining_size); item.remaining_offset = 0; item.remaining_size = item.message.payload().size(); item.message_id = absl::nullopt; @@ -243,25 +258,15 @@ void RRSendQueue::Add(TimeMs now, } GetOrCreateStreamInfo(message.stream_id()) .Add(std::move(message), expires_at, send_options); -} - -size_t RRSendQueue::total_bytes() const { - // TODO(boivie): Have the current size as a member variable, so that it's not - // calculated for every operation. - size_t bytes = 0; - for (const auto& stream : streams_) { - bytes += stream.second.buffered_amount().value(); - } - - return bytes; + RTC_DCHECK(IsConsistent()); } bool RRSendQueue::IsFull() const { - return total_bytes() >= buffer_size_; + return total_buffered_amount() >= buffer_size_; } bool RRSendQueue::IsEmpty() const { - return total_bytes() == 0; + return total_buffered_amount() == 0; } absl::optional RRSendQueue::Produce( @@ -279,7 +284,7 @@ absl::optional RRSendQueue::Produce( next_stream_id_ = StreamID(*it->first + 1); } } - + RTC_DCHECK(IsConsistent()); return data; } @@ -312,6 +317,7 @@ void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { for (StreamID stream_id : streams) { GetOrCreateStreamInfo(stream_id).Pause(); } + RTC_DCHECK(IsConsistent()); } bool RRSendQueue::CanResetStreams() const { @@ -328,15 +334,19 @@ bool RRSendQueue::CanResetStreams() const { void RRSendQueue::CommitResetStreams() { Reset(); + RTC_DCHECK(IsConsistent()); } void RRSendQueue::RollbackResetStreams() { for (auto& stream_entry : streams_) { stream_entry.second.Resume(); } + RTC_DCHECK(IsConsistent()); } void RRSendQueue::Reset() { + // Recalculate buffered amount, as partially sent messages may have been put + // fully back in the queue. for (auto& stream_entry : streams_) { OutgoingStream& stream = stream_entry.second; stream.Reset(); @@ -373,7 +383,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo( return streams_ .emplace(stream_id, - [this, stream_id]() { on_buffered_amount_low_(stream_id); }) + OutgoingStream( + [this, stream_id]() { on_buffered_amount_low_(stream_id); }, + total_buffered_amount_)) .first->second; } } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h index d7fcc9542e4a..bd96bb9e8b6f 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue.h @@ -46,10 +46,15 @@ class RRSendQueue : public SendQueue { RRSendQueue(absl::string_view log_prefix, size_t buffer_size, - std::function on_buffered_amount_low) + std::function on_buffered_amount_low, + size_t total_buffered_amount_low_threshold, + std::function on_total_buffered_amount_low) : log_prefix_(std::string(log_prefix) + "fcfs: "), buffer_size_(buffer_size), - on_buffered_amount_low_(std::move(on_buffered_amount_low)) {} + on_buffered_amount_low_(std::move(on_buffered_amount_low)), + total_buffered_amount_(std::move(on_total_buffered_amount_low)) { + total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); + } // Indicates if the buffer is full. Note that it's up to the caller to ensure // that the buffer is not full prior to adding new items to it. @@ -76,12 +81,12 @@ class RRSendQueue : public SendQueue { void RollbackResetStreams() override; void Reset() override; size_t buffered_amount(StreamID stream_id) const override; + size_t total_buffered_amount() const override { + return total_buffered_amount_.value(); + } size_t buffered_amount_low_threshold(StreamID stream_id) const override; void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; - // The size of the buffer, in "payload bytes". - size_t total_bytes() const; - private: // Represents a value and a "low threshold" that when the value reaches or // goes under the "low threshold", will trigger `on_threshold_reached` @@ -109,8 +114,10 @@ class RRSendQueue : public SendQueue { // Per-stream information. class OutgoingStream { public: - explicit OutgoingStream(std::function on_buffered_amount_low) - : buffered_amount_(std::move(on_buffered_amount_low)) {} + explicit OutgoingStream(std::function on_buffered_amount_low, + ThresholdWatcher& total_buffered_amount) + : buffered_amount_(std::move(on_buffered_amount_low)), + total_buffered_amount_(total_buffered_amount) {} // Enqueues a message to this stream. void Add(DcSctpMessage message, @@ -182,8 +189,13 @@ class RRSendQueue : public SendQueue { // The current amount of buffered data. ThresholdWatcher buffered_amount_; + + // Reference to the total buffered amount, which is updated directly by each + // stream. + ThresholdWatcher& total_buffered_amount_; }; + bool IsConsistent() const; OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id); absl::optional Produce( std::map::iterator it, @@ -192,10 +204,18 @@ class RRSendQueue : public SendQueue { const std::string log_prefix_; const size_t buffer_size_; + // Called when the buffered amount is below what has been set using // `SetBufferedAmountLowThreshold`. const std::function on_buffered_amount_low_; + // Called when the total buffered amount is below what has been set using + // `SetTotalBufferedAmountLowThreshold`. + const std::function on_total_buffered_amount_low_; + + // The total amount of buffer data, for all streams. + ThresholdWatcher total_buffered_amount_; + // The next stream to send chunks from. StreamID next_stream_id_ = StreamID(0); diff --git a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc index 3bc748caaca0..e4897b70cbdc 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc +++ b/third_party/libwebrtc/net/dcsctp/tx/rr_send_queue_test.cc @@ -31,17 +31,24 @@ constexpr TimeMs kNow = TimeMs(0); constexpr StreamID kStreamID(1); constexpr PPID kPPID(53); constexpr size_t kMaxQueueSize = 1000; +constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kOneFragmentPacketSize = 100; constexpr size_t kTwoFragmentPacketSize = 101; class RRSendQueueTest : public testing::Test { protected: RRSendQueueTest() - : buf_("log: ", kMaxQueueSize, on_buffered_amount_low_.AsStdFunction()) {} + : buf_("log: ", + kMaxQueueSize, + on_buffered_amount_low_.AsStdFunction(), + kBufferedAmountLowThreshold, + on_total_buffered_amount_low_.AsStdFunction()) {} const DcSctpOptions options_; testing::NiceMock> on_buffered_amount_low_; + testing::NiceMock> + on_total_buffered_amount_low_; RRSendQueue buf_; }; @@ -272,13 +279,13 @@ TEST_F(RRSendQueueTest, DiscardPartialPackets) { TEST_F(RRSendQueueTest, PrepareResetStreamsDiscardsStream) { buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3})); buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5})); - EXPECT_EQ(buf_.total_bytes(), 8u); + EXPECT_EQ(buf_.total_buffered_amount(), 8u); buf_.PrepareResetStreams(std::vector({StreamID(1)})); - EXPECT_EQ(buf_.total_bytes(), 5u); + EXPECT_EQ(buf_.total_buffered_amount(), 5u); buf_.CommitResetStreams(); buf_.PrepareResetStreams(std::vector({StreamID(2)})); - EXPECT_EQ(buf_.total_bytes(), 0u); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); } TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { @@ -290,30 +297,30 @@ TEST_F(RRSendQueueTest, PrepareResetStreamsNotPartialPackets) { absl::optional chunk_one = buf_.Produce(kNow, 50); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_bytes(), 2 * payload.size() - 50); + EXPECT_EQ(buf_.total_buffered_amount(), 2 * payload.size() - 50); StreamID stream_ids[] = {StreamID(1)}; buf_.PrepareResetStreams(stream_ids); - EXPECT_EQ(buf_.total_bytes(), payload.size() - 50); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size() - 50); } TEST_F(RRSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) { std::vector payload(50); buf_.PrepareResetStreams(std::vector({StreamID(1)})); - EXPECT_EQ(buf_.total_bytes(), 0u); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); - EXPECT_EQ(buf_.total_bytes(), payload.size()); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); buf_.CommitResetStreams(); - EXPECT_EQ(buf_.total_bytes(), payload.size()); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); absl::optional chunk_one = buf_.Produce(kNow, 50); ASSERT_TRUE(chunk_one.has_value()); EXPECT_EQ(chunk_one->data.stream_id, kStreamID); - EXPECT_EQ(buf_.total_bytes(), 0u); + EXPECT_EQ(buf_.total_buffered_amount(), 0u); } TEST_F(RRSendQueueTest, CommittingResetsSSN) { @@ -633,5 +640,31 @@ TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) { buf_.SetBufferedAmountLowThreshold(StreamID(1), 0); } +TEST_F(RRSendQueueTest, + OnTotalBufferedAmountLowDoesNotTriggerOnBufferFillingUp) { + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0); + std::vector payload(kBufferedAmountLowThreshold - 1); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + // Will not trigger if going above but never below. + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, + std::vector(kOneFragmentPacketSize))); +} + +TEST_F(RRSendQueueTest, TriggersOnTotalBufferedAmountLowWhenCrossing) { + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(0); + std::vector payload(kBufferedAmountLowThreshold); + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload)); + EXPECT_EQ(buf_.total_buffered_amount(), payload.size()); + + // Reaches it. + buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, std::vector(1))); + + // Drain it a bit - will trigger. + EXPECT_CALL(on_total_buffered_amount_low_, Call).Times(1); + absl::optional chunk_two = + buf_.Produce(kNow, kOneFragmentPacketSize); +} } // namespace } // namespace dcsctp diff --git a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h index 67bd29476860..877dbdda5985 100644 --- a/third_party/libwebrtc/net/dcsctp/tx/send_queue.h +++ b/third_party/libwebrtc/net/dcsctp/tx/send_queue.h @@ -113,6 +113,9 @@ class SendQueue { // e.g. inflight. virtual size_t buffered_amount(StreamID stream_id) const = 0; + // Returns the total amount of buffer data, for all streams. + virtual size_t total_buffered_amount() const = 0; + // Returns the limit for the `OnBufferedAmountLow` event. Default value is 0. virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0;