зеркало из https://github.com/mozilla/gecko-dev.git
Bug 1766646 - Vendor libwebrtc from 791adafa09
Upstream commit: https://webrtc.googlesource.com/src/+/791adafa09a1fdf6122e3f5b45c1e397bc6223a0 dcsctp: Add OnBufferedAmountLow in Send Queue This adds the necessary properties and callback to the Send Queue to support the bufferedAmount & bufferedAmountLowThreshold properties and the bufferedamountlow event in RTCDataChannel. The public API changes and socket support comes in a follow-up CL. Bug: webrtc:12794 Change-Id: I12a16f44f775da3711f3aa52a68a0bf24f70d2f8 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219690 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/master@{#34142}
This commit is contained in:
Родитель
690b84719e
Коммит
e540e11081
|
@ -5964,3 +5964,6 @@ e52cfab633
|
|||
# MOZ_LIBWEBRTC_SRC=/home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src MOZ_LIBWEBRTC_COMMIT=mjfdev bash dom/media/webrtc/third_party_build/fast-forward-libwebrtc.sh
|
||||
# base of lastest vendoring
|
||||
a1b8201009
|
||||
# MOZ_LIBWEBRTC_SRC=/home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src MOZ_LIBWEBRTC_COMMIT=mjfdev bash dom/media/webrtc/third_party_build/fast-forward-libwebrtc.sh
|
||||
# base of lastest vendoring
|
||||
791adafa09
|
||||
|
|
|
@ -3978,3 +3978,5 @@ libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwe
|
|||
libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:36:57.472504.
|
||||
# python3 vendor-libwebrtc.py --from-local /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src --commit mjfdev libwebrtc
|
||||
libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:37:39.311600.
|
||||
# python3 vendor-libwebrtc.py --from-local /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src --commit mjfdev libwebrtc
|
||||
libwebrtc updated from /home/mfroman/git-checkouts/trial-webrtc-builds/moz-libwebrtc-checkout/src commit mjfdev on 2022-05-26T17:40:03.276020.
|
||||
|
|
|
@ -167,7 +167,9 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
|
|||
TimerOptions(options.t2_shutdown_timeout,
|
||||
TimerBackoffAlgorithm::kExponential,
|
||||
options.max_retransmissions))),
|
||||
send_queue_(log_prefix_, options_.max_send_buffer_size) {}
|
||||
send_queue_(log_prefix_,
|
||||
options_.max_send_buffer_size,
|
||||
[](StreamID stream_id) {}) {}
|
||||
|
||||
std::string DcSctpSocket::log_prefix() const {
|
||||
return log_prefix_ + "[" + std::string(ToString(state_)) + "] ";
|
||||
|
|
|
@ -43,6 +43,15 @@ class MockSendQueue : public SendQueue {
|
|||
MOCK_METHOD(void, CommitResetStreams, (), (override));
|
||||
MOCK_METHOD(void, RollbackResetStreams, (), (override));
|
||||
MOCK_METHOD(void, Reset, (), (override));
|
||||
MOCK_METHOD(size_t, buffered_amount, (StreamID stream_id), (const, override));
|
||||
MOCK_METHOD(size_t,
|
||||
buffered_amount_low_threshold,
|
||||
(StreamID stream_id),
|
||||
(const, override));
|
||||
MOCK_METHOD(void,
|
||||
SetBufferedAmountLowThreshold,
|
||||
(StreamID stream_id, size_t bytes),
|
||||
(override));
|
||||
};
|
||||
|
||||
} // namespace dcsctp
|
||||
|
|
|
@ -38,19 +38,51 @@ RRSendQueue::OutgoingStream::GetFirstNonExpiredMessage(TimeMs now) {
|
|||
if (!item.message_id.has_value() && item.expires_at.has_value() &&
|
||||
*item.expires_at <= now) {
|
||||
// TODO(boivie): This should be reported to the client.
|
||||
buffered_amount_.Decrease(item.remaining_size);
|
||||
items_.pop_front();
|
||||
continue;
|
||||
}
|
||||
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return &item;
|
||||
}
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
bool RRSendQueue::OutgoingStream::IsConsistent() const {
|
||||
size_t bytes = 0;
|
||||
for (const auto& item : items_) {
|
||||
bytes += item.remaining_size;
|
||||
}
|
||||
return bytes == buffered_amount_.value();
|
||||
}
|
||||
|
||||
void RRSendQueue::ThresholdWatcher::Decrease(size_t bytes) {
|
||||
RTC_DCHECK(bytes <= value_);
|
||||
size_t old_value = value_;
|
||||
value_ -= bytes;
|
||||
|
||||
if (old_value > low_threshold_ && value_ <= low_threshold_) {
|
||||
on_threshold_reached_();
|
||||
}
|
||||
}
|
||||
|
||||
void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
||||
// Betting on https://github.com/w3c/webrtc-pc/issues/2654 being accepted.
|
||||
if (low_threshold_ < value_ && low_threshold >= value_) {
|
||||
on_threshold_reached_();
|
||||
}
|
||||
low_threshold_ = low_threshold;
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
||||
absl::optional<TimeMs> expires_at,
|
||||
const SendOptions& send_options) {
|
||||
buffered_amount_.Increase(message.payload().size());
|
||||
items_.emplace_back(std::move(message), expires_at, send_options);
|
||||
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
||||
|
@ -58,18 +90,21 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
|||
size_t max_size) {
|
||||
Item* item = GetFirstNonExpiredMessage(now);
|
||||
if (item == nullptr) {
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
// If a stream is paused, it will allow sending all partially sent messages
|
||||
// but will not start sending new fragments of completely unsent messages.
|
||||
if (is_paused_ && !item->message_id.has_value()) {
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
DcSctpMessage& message = item->message;
|
||||
|
||||
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
|
@ -105,6 +140,7 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
|||
|
||||
FSN fsn(item->current_fsn);
|
||||
item->current_fsn = FSN(*item->current_fsn + 1);
|
||||
buffered_amount_.Decrease(payload.size());
|
||||
|
||||
SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
|
||||
item->message_id.value(), fsn, ppid,
|
||||
|
@ -124,47 +160,45 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
|
|||
item->message.payload().size());
|
||||
RTC_DCHECK(item->remaining_size > 0);
|
||||
}
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return chunk;
|
||||
}
|
||||
|
||||
size_t RRSendQueue::OutgoingStream::buffered_amount() const {
|
||||
size_t bytes = 0;
|
||||
for (const auto& item : items_) {
|
||||
bytes += item.remaining_size;
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
||||
MID message_id) {
|
||||
bool result = false;
|
||||
if (!items_.empty()) {
|
||||
Item& item = items_.front();
|
||||
if (item.send_options.unordered == unordered &&
|
||||
item.message_id.has_value() && *item.message_id == message_id) {
|
||||
buffered_amount_.Decrease(item.remaining_size);
|
||||
items_.pop_front();
|
||||
// As the item still existed, it had unsent data.
|
||||
return true;
|
||||
result = true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
RTC_DCHECK(IsConsistent());
|
||||
return result;
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::Pause() {
|
||||
is_paused_ = true;
|
||||
|
||||
// A stream is pause when it's about to be reset. In this implementation,
|
||||
// A stream is paused when it's about to be reset. In this implementation,
|
||||
// it will throw away all non-partially send messages. This is subject to
|
||||
// change. It will however not discard any partially sent messages - only
|
||||
// whole messages. Partially delivered messages (at the time of receiving a
|
||||
// Stream Reset command) will always deliver all the fragments before actually
|
||||
// resetting the stream.
|
||||
// Stream Reset command) will always deliver all the fragments before
|
||||
// actually resetting the stream.
|
||||
for (auto it = items_.begin(); it != items_.end();) {
|
||||
if (it->remaining_offset == 0) {
|
||||
buffered_amount_.Decrease(it->remaining_size);
|
||||
it = items_.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
void RRSendQueue::OutgoingStream::Reset() {
|
||||
|
@ -172,6 +206,8 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||
// If this message has been partially sent, reset it so that it will be
|
||||
// re-sent.
|
||||
auto& item = items_.front();
|
||||
buffered_amount_.Increase(item.message.payload().size() -
|
||||
item.remaining_size);
|
||||
item.remaining_offset = 0;
|
||||
item.remaining_size = item.message.payload().size();
|
||||
item.message_id = absl::nullopt;
|
||||
|
@ -182,6 +218,7 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||
next_ordered_mid_ = MID(0);
|
||||
next_unordered_mid_ = MID(0);
|
||||
next_ssn_ = SSN(0);
|
||||
RTC_DCHECK(IsConsistent());
|
||||
}
|
||||
|
||||
bool RRSendQueue::OutgoingStream::has_partially_sent_message() const {
|
||||
|
@ -209,11 +246,11 @@ void RRSendQueue::Add(TimeMs now,
|
|||
}
|
||||
|
||||
size_t RRSendQueue::total_bytes() const {
|
||||
// TODO(boivie): Have the current size as a member variable, so that's it not
|
||||
// TODO(boivie): Have the current size as a member variable, so that it's not
|
||||
// calculated for every operation.
|
||||
size_t bytes = 0;
|
||||
for (const auto& stream : streams_) {
|
||||
bytes += stream.second.buffered_amount();
|
||||
bytes += stream.second.buffered_amount().value();
|
||||
}
|
||||
|
||||
return bytes;
|
||||
|
@ -306,6 +343,27 @@ void RRSendQueue::Reset() {
|
|||
}
|
||||
}
|
||||
|
||||
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
||||
auto it = streams_.find(stream_id);
|
||||
if (it == streams_.end()) {
|
||||
return 0;
|
||||
}
|
||||
return it->second.buffered_amount().value();
|
||||
}
|
||||
|
||||
size_t RRSendQueue::buffered_amount_low_threshold(StreamID stream_id) const {
|
||||
auto it = streams_.find(stream_id);
|
||||
if (it == streams_.end()) {
|
||||
return 0;
|
||||
}
|
||||
return it->second.buffered_amount().low_threshold();
|
||||
}
|
||||
|
||||
void RRSendQueue::SetBufferedAmountLowThreshold(StreamID stream_id,
|
||||
size_t bytes) {
|
||||
GetOrCreateStreamInfo(stream_id).buffered_amount().SetLowThreshold(bytes);
|
||||
}
|
||||
|
||||
RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
||||
StreamID stream_id) {
|
||||
auto it = streams_.find(stream_id);
|
||||
|
@ -313,6 +371,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
|||
return it->second;
|
||||
}
|
||||
|
||||
return streams_.emplace(stream_id, OutgoingStream()).first->second;
|
||||
return streams_
|
||||
.emplace(stream_id,
|
||||
[this, stream_id]() { on_buffered_amount_low_(stream_id); })
|
||||
.first->second;
|
||||
}
|
||||
} // namespace dcsctp
|
||||
|
|
|
@ -44,9 +44,12 @@ class RRSendQueue : public SendQueue {
|
|||
// How small a data chunk's payload may be, if having to fragment a message.
|
||||
static constexpr size_t kMinimumFragmentedPayload = 10;
|
||||
|
||||
RRSendQueue(absl::string_view log_prefix, size_t buffer_size)
|
||||
RRSendQueue(absl::string_view log_prefix,
|
||||
size_t buffer_size,
|
||||
std::function<void(StreamID)> on_buffered_amount_low)
|
||||
: log_prefix_(std::string(log_prefix) + "fcfs: "),
|
||||
buffer_size_(buffer_size) {}
|
||||
buffer_size_(buffer_size),
|
||||
on_buffered_amount_low_(std::move(on_buffered_amount_low)) {}
|
||||
|
||||
// Indicates if the buffer is full. Note that it's up to the caller to ensure
|
||||
// that the buffer is not full prior to adding new items to it.
|
||||
|
@ -72,14 +75,43 @@ class RRSendQueue : public SendQueue {
|
|||
void CommitResetStreams() override;
|
||||
void RollbackResetStreams() override;
|
||||
void Reset() override;
|
||||
size_t buffered_amount(StreamID stream_id) const override;
|
||||
size_t buffered_amount_low_threshold(StreamID stream_id) const override;
|
||||
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
|
||||
|
||||
// The size of the buffer, in "payload bytes".
|
||||
size_t total_bytes() const;
|
||||
|
||||
private:
|
||||
// Represents a value and a "low threshold" that when the value reaches or
|
||||
// goes under the "low threshold", will trigger `on_threshold_reached`
|
||||
// callback.
|
||||
class ThresholdWatcher {
|
||||
public:
|
||||
explicit ThresholdWatcher(std::function<void()> on_threshold_reached)
|
||||
: on_threshold_reached_(std::move(on_threshold_reached)) {}
|
||||
// Increases the value.
|
||||
void Increase(size_t bytes) { value_ += bytes; }
|
||||
// Decreases the value and triggers `on_threshold_reached` if it's at or
|
||||
// below `low_threshold()`.
|
||||
void Decrease(size_t bytes);
|
||||
|
||||
size_t value() const { return value_; }
|
||||
size_t low_threshold() const { return low_threshold_; }
|
||||
void SetLowThreshold(size_t low_threshold);
|
||||
|
||||
private:
|
||||
const std::function<void()> on_threshold_reached_;
|
||||
size_t value_ = 0;
|
||||
size_t low_threshold_ = 0;
|
||||
};
|
||||
|
||||
// Per-stream information.
|
||||
class OutgoingStream {
|
||||
public:
|
||||
explicit OutgoingStream(std::function<void()> on_buffered_amount_low)
|
||||
: buffered_amount_(std::move(on_buffered_amount_low)) {}
|
||||
|
||||
// Enqueues a message to this stream.
|
||||
void Add(DcSctpMessage message,
|
||||
absl::optional<TimeMs> expires_at,
|
||||
|
@ -88,8 +120,8 @@ class RRSendQueue : public SendQueue {
|
|||
// Possibly produces a data chunk to send.
|
||||
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
|
||||
|
||||
// The amount of data enqueued on this stream.
|
||||
size_t buffered_amount() const;
|
||||
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
|
||||
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
|
||||
|
||||
// Discards a partially sent message, see `SendQueue::Discard`.
|
||||
bool Discard(IsUnordered unordered, MID message_id);
|
||||
|
@ -136,6 +168,7 @@ class RRSendQueue : public SendQueue {
|
|||
|
||||
// Returns the first non-expired message, or nullptr if there isn't one.
|
||||
Item* GetFirstNonExpiredMessage(TimeMs now);
|
||||
bool IsConsistent() const;
|
||||
|
||||
// Streams are pause when they are about to be reset.
|
||||
bool is_paused_ = false;
|
||||
|
@ -146,6 +179,9 @@ class RRSendQueue : public SendQueue {
|
|||
SSN next_ssn_ = SSN(0);
|
||||
// Enqueued messages, and metadata.
|
||||
std::deque<Item> items_;
|
||||
|
||||
// The current amount of buffered data.
|
||||
ThresholdWatcher buffered_amount_;
|
||||
};
|
||||
|
||||
OutgoingStream& GetOrCreateStreamInfo(StreamID stream_id);
|
||||
|
@ -156,6 +192,9 @@ class RRSendQueue : public SendQueue {
|
|||
|
||||
const std::string log_prefix_;
|
||||
const size_t buffer_size_;
|
||||
// Called when the buffered amount is below what has been set using
|
||||
// `SetBufferedAmountLowThreshold`.
|
||||
const std::function<void(StreamID)> on_buffered_amount_low_;
|
||||
|
||||
// The next stream to send chunks from.
|
||||
StreamID next_stream_id_ = StreamID(0);
|
||||
|
|
|
@ -36,9 +36,12 @@ constexpr size_t kTwoFragmentPacketSize = 101;
|
|||
|
||||
class RRSendQueueTest : public testing::Test {
|
||||
protected:
|
||||
RRSendQueueTest() : buf_("log: ", kMaxQueueSize) {}
|
||||
RRSendQueueTest()
|
||||
: buf_("log: ", kMaxQueueSize, on_buffered_amount_low_.AsStdFunction()) {}
|
||||
|
||||
const DcSctpOptions options_;
|
||||
testing::NiceMock<testing::MockFunction<void(StreamID)>>
|
||||
on_buffered_amount_low_;
|
||||
RRSendQueue buf_;
|
||||
};
|
||||
|
||||
|
@ -474,5 +477,161 @@ TEST_F(RRSendQueueTest, WillCycleInRoundRobinFashionBetweenStreams) {
|
|||
EXPECT_EQ(chunk8.data.stream_id, StreamID(4));
|
||||
EXPECT_THAT(chunk8.data.payload, SizeIs(8));
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, DoesntTriggerOnBufferedAmountLowWhenSetToZero) {
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 0u);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, TriggersOnBufferedAmountAtZeroLowWhenSent) {
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk1.data.payload, SizeIs(1));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowIfAddingMore) {
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk1.data.payload, SizeIs(1));
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1)));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 1u);
|
||||
|
||||
// Should now trigger again, as buffer_amount went above the threshold.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk2.data.payload, SizeIs(1));
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, OnlyTriggersWhenTransitioningFromAboveToBelowOrEqual) {
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 1000);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(10)));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 10u);
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk1.data.payload, SizeIs(10));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(20)));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 20u);
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk2.data.payload, SizeIs(20));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 0u);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillTriggerOnBufferedAmountLowSetAboveZero) {
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
|
||||
|
||||
std::vector<uint8_t> payload(1000);
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, payload));
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk1.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 900u);
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk2.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk3,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 700u);
|
||||
|
||||
// Doesn't trigger when reducing even further.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk4,
|
||||
buf_.Produce(kNow, kOneFragmentPacketSize));
|
||||
EXPECT_EQ(chunk3.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk3.data.payload, SizeIs(kOneFragmentPacketSize));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, WillRetriggerOnBufferedAmountLowSetAboveZero) {
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 700);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(1000)));
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk1,
|
||||
buf_.Produce(kNow, 400));
|
||||
EXPECT_EQ(chunk1.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk1.data.payload, SizeIs(400));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
|
||||
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(200)));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 800u);
|
||||
|
||||
// Will trigger again, as it went above the limit.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
ASSERT_HAS_VALUE_AND_ASSIGN(SendQueue::DataToSend chunk2,
|
||||
buf_.Produce(kNow, 200));
|
||||
EXPECT_EQ(chunk2.data.stream_id, StreamID(1));
|
||||
EXPECT_THAT(chunk2.data.payload, SizeIs(200));
|
||||
EXPECT_EQ(buf_.buffered_amount(StreamID(1)), 600u);
|
||||
}
|
||||
|
||||
TEST_F(RRSendQueueTest, TriggersOnBufferedAmountLowOnThresholdChanged) {
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(1), kPPID, std::vector<uint8_t>(100)));
|
||||
|
||||
// Modifying the threshold, still under buffered_amount, should not trigger.
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 99);
|
||||
|
||||
// When the threshold reaches buffered_amount, it will trigger.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 100);
|
||||
|
||||
// But not when it's set low again.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 50);
|
||||
|
||||
// But it will trigger when it overshoots.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call(StreamID(1)));
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 150);
|
||||
|
||||
// But not when it's set low again.
|
||||
EXPECT_CALL(on_buffered_amount_low_, Call).Times(0);
|
||||
buf_.SetBufferedAmountLowThreshold(StreamID(1), 0);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
|
|
|
@ -108,6 +108,17 @@ class SendQueue {
|
|||
// of data loss. However, data loss cannot be completely guaranteed when a
|
||||
// peer restarts.
|
||||
virtual void Reset() = 0;
|
||||
|
||||
// Returns the amount of buffered data. This doesn't include packets that are
|
||||
// e.g. inflight.
|
||||
virtual size_t buffered_amount(StreamID stream_id) const = 0;
|
||||
|
||||
// Returns the limit for the `OnBufferedAmountLow` event. Default value is 0.
|
||||
virtual size_t buffered_amount_low_threshold(StreamID stream_id) const = 0;
|
||||
|
||||
// Sets a limit for the `OnBufferedAmountLow` event.
|
||||
virtual void SetBufferedAmountLowThreshold(StreamID stream_id,
|
||||
size_t bytes) = 0;
|
||||
};
|
||||
} // namespace dcsctp
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче