From e39f1b590785c8b53e2b0ff7187d71691b6d2837 Mon Sep 17 00:00:00 2001 From: Victor Boivie Date: Thu, 12 May 2022 22:38:51 +0200 Subject: [PATCH] dcsctp: Add priority support to send queue This mainly modifies the stream scheduler to add a weighted fair queuing algorithm in addition to its round robin algorithm. The WFQ algorithm is selected whenever interleaving is enabled, to ensure that the socket stays backwards compatible in the normal (non-interleaved) scenario. Adaptation to send queue and socket comes in a follow-up CL. Bug: webrtc:5696 Change-Id: I8f0dbfa8c2f40f2e84cee536ea821e7ef4af6310 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261947 Reviewed-by: Harald Alvestrand Commit-Queue: Victor Boivie Cr-Commit-Position: refs/heads/main@{#37330} --- net/dcsctp/tx/BUILD.gn | 2 + net/dcsctp/tx/rr_send_queue.cc | 4 +- net/dcsctp/tx/rr_send_queue.h | 4 +- net/dcsctp/tx/stream_scheduler.cc | 48 ++- net/dcsctp/tx/stream_scheduler.h | 56 +++- net/dcsctp/tx/stream_scheduler_test.cc | 385 ++++++++++++++++++++++++- 6 files changed, 460 insertions(+), 39 deletions(-) diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index ae81db8c67..e691b76138 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -55,7 +55,9 @@ rtc_library("stream_scheduler") { "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_set", "../common:str_join", + "../packet:chunk", "../packet:data", + "../packet:sctp_packet", "../public:socket", "../public:types", ] diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index ee8bf828d0..bec6f08def 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -39,6 +39,8 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, : log_prefix_(std::string(log_prefix) + "fcfs: "), buffer_size_(buffer_size), default_priority_(default_priority), + // TODO(webrtc:5696): Provide correct MTU. + scheduler_(DcSctpOptions::kMaxSafeMTUSize), on_buffered_amount_low_(std::move(on_buffered_amount_low)), total_buffered_amount_(std::move(on_total_buffered_amount_low)) { total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); @@ -482,7 +484,7 @@ void RRSendQueue::SetStreamPriority(StreamID stream_id, StreamPriority priority) { OutgoingStream& stream = GetOrCreateStreamInfo(stream_id); - stream.set_priority(priority); + stream.SetPriority(priority); RTC_DCHECK(IsConsistent()); } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index eea814c310..c2f1ee8e73 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -171,8 +171,8 @@ class RRSendQueue : public SendQueue { bool has_partially_sent_message() const; StreamPriority priority() const { return scheduler_stream_->priority(); } - void set_priority(StreamPriority priority) { - scheduler_stream_->set_priority(priority); + void SetPriority(StreamPriority priority) { + scheduler_stream_->SetPriority(priority); } void AddHandoverState( diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc index 1d15aec4da..d1560a75e4 100644 --- a/net/dcsctp/tx/stream_scheduler.cc +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -9,6 +9,8 @@ */ #include "net/dcsctp/tx/stream_scheduler.h" +#include + #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" @@ -23,14 +25,19 @@ namespace dcsctp { -void StreamScheduler::Stream::set_priority(StreamPriority priority) { +void StreamScheduler::Stream::SetPriority(StreamPriority priority) { priority_ = priority; + inverse_weight_ = InverseWeight(priority); } absl::optional StreamScheduler::Produce( TimeMs now, size_t max_size) { - bool rescheduling = !currently_sending_a_message_; + // For non-interleaved streams, avoid rescheduling while still sending a + // message as it needs to be sent in full. For interleaved messaging, + // reschedule for every I-DATA chunk sent. + bool rescheduling = + enable_message_interleaving_ || !currently_sending_a_message_; RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling << ", active=" @@ -92,7 +99,7 @@ absl::optional StreamScheduler::Produce( // in `active_streams`. size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message(); if (rescheduling && bytes_to_send_next > 0) { - current_stream_->MakeActive(); + current_stream_->MakeActive(bytes_to_send_next); } else if (!rescheduling && bytes_to_send_next == 0) { current_stream_->MakeInactive(); } @@ -101,13 +108,19 @@ absl::optional StreamScheduler::Produce( return data; } -StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() - const { - // Implement round-robin by letting the stream have its next virtual finish - // time in the future. It doesn't matter how far into the future, just any - // positive number so that any other stream that has the same virtual finish - // time as this stream gets to produce their data before revisiting this - // stream. +StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( + size_t bytes_to_send_next) const { + if (parent_.enable_message_interleaving_) { + // Perform weighted fair queuing scheduling. + return VirtualTime(*current_virtual_time_ + + bytes_to_send_next * *inverse_weight_); + } + + // Perform round-robin scheduling by letting the stream have its next virtual + // finish time in the future. It doesn't matter how far into the future, just + // any positive number so that any other stream that has the same virtual + // finish time as this stream gets to produce their data before revisiting + // this stream. return VirtualTime(*current_virtual_time_ + 1); } @@ -117,7 +130,7 @@ absl::optional StreamScheduler::Stream::Produce( absl::optional data = producer_.Produce(now, max_size); if (data.has_value()) { - VirtualTime new_current = GetNextFinishTime(); + VirtualTime new_current = CalculateFinishTime(data->data.payload.size()); RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_ << " -> " << *new_current; current_virtual_time_ = new_current; @@ -140,19 +153,22 @@ bool StreamScheduler::IsConsistent() const { void StreamScheduler::Stream::MaybeMakeActive() { RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")"; RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); - if (bytes_to_send_in_next_message() == 0) { + size_t bytes_to_send_next = bytes_to_send_in_next_message(); + if (bytes_to_send_next == 0) { return; } - MakeActive(); + MakeActive(bytes_to_send_next); } -void StreamScheduler::Stream::MakeActive() { +void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) { current_virtual_time_ = parent_.virtual_time_; - VirtualTime next_finish_time = GetNextFinishTime(); + RTC_DCHECK_GT(bytes_to_send_next, 0); + VirtualTime next_finish_time = CalculateFinishTime( + std::min(bytes_to_send_next, parent_.max_payload_bytes_)); + RTC_DCHECK_GT(*next_finish_time, 0); RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " active, expiring at " << *next_finish_time; - RTC_DCHECK_GT(*next_finish_time, 0); RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); next_finish_time_ = next_finish_time; RTC_DCHECK(!absl::c_any_of(parent_.active_streams_, diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h index e76f474248..9c523edbfc 100644 --- a/net/dcsctp/tx/stream_scheduler.h +++ b/net/dcsctp/tx/stream_scheduler.h @@ -10,6 +10,7 @@ #ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ #define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ +#include #include #include #include @@ -24,6 +25,8 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" @@ -42,9 +45,21 @@ namespace dcsctp { // with a "virtual finish time", which is the time when a stream is allowed to // produce data. Streams are ordered by their virtual finish time, and the // "current virtual time" will advance to the next following virtual finish time -// whenever a chunk is to be produced. In the initial round-robin scheduling -// algorithm, a stream's virtual finish time will just increment by one (1) -// after having produced a chunk, which results in a round-robin scheduling. +// whenever a chunk is to be produced. +// +// When message interleaving is enabled, the WFQ - Weighted Fair Queueing - +// scheduling algorithm will be used. And when it's not, round-robin scheduling +// will be used instead. +// +// In the round robin scheduling algorithm, a stream's virtual finish time will +// just increment by one (1) after having produced a chunk, which results in a +// round-robin scheduling. +// +// In WFQ scheduling algorithm, a stream's virtual finish time will be defined +// as the number of bytes in the next fragment to be sent, multiplied by the +// inverse of the stream's priority, meaning that a high priority - or a smaller +// fragment - results in a closer virtual finish time, compared to a stream with +// either a lower priority or a larger fragment to be sent. class StreamScheduler { private: class VirtualTime : public webrtc::StrongAlias { @@ -54,6 +69,13 @@ class StreamScheduler { static constexpr VirtualTime Zero() { return VirtualTime(0); } }; + class InverseWeight + : public webrtc::StrongAlias { + public: + constexpr explicit InverseWeight(StreamPriority priority) + : webrtc::StrongAlias( + 1.0 / std::max(static_cast(*priority), 0.000001)) {} + }; public: class StreamProducer { @@ -79,7 +101,7 @@ class StreamScheduler { StreamID stream_id() const { return stream_id_; } StreamPriority priority() const { return priority_; } - void set_priority(StreamPriority priority); + void SetPriority(StreamPriority priority); // Will activate the stream _if_ it has any data to send. That is, if the // callback to `bytes_to_send_in_next_message` returns non-zero. If the @@ -105,13 +127,14 @@ class StreamScheduler { : parent_(*parent), producer_(*producer), stream_id_(stream_id), - priority_(priority) {} + priority_(priority), + inverse_weight_(priority) {} // Produces a message from this stream. This will only be called on streams // that have data. absl::optional Produce(TimeMs now, size_t max_size); - void MakeActive(); + void MakeActive(size_t bytes_to_send_next); void ForceMarkInactive(); VirtualTime current_time() const { return current_virtual_time_; } @@ -120,24 +143,34 @@ class StreamScheduler { return producer_.bytes_to_send_in_next_message(); } - // Returns the next virtual finish time for this stream. - VirtualTime GetNextFinishTime() const; + VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const; StreamScheduler& parent_; StreamProducer& producer_; const StreamID stream_id_; StreamPriority priority_; + InverseWeight inverse_weight_; // This outgoing stream's "current" virtual_time. VirtualTime current_virtual_time_ = VirtualTime::Zero(); VirtualTime next_finish_time_ = VirtualTime::Zero(); }; + // The `mtu` parameter represents the maximum SCTP packet size, which should + // be the same as `DcSctpOptions::mtu`. + explicit StreamScheduler(size_t mtu) + : max_payload_bytes_(mtu - SctpPacket::kHeaderSize - + IDataChunk::kHeaderSize) {} + std::unique_ptr CreateStream(StreamProducer* producer, StreamID stream_id, StreamPriority priority) { return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); } + void EnableMessageInterleaving(bool enabled) { + enable_message_interleaving_ = enabled; + } + // Makes the scheduler stop producing message from the current stream and // re-evaluates which stream to produce from. void ForceReschedule() { currently_sending_a_message_ = false; } @@ -165,14 +198,19 @@ class StreamScheduler { bool IsConsistent() const; + const size_t max_payload_bytes_; + // The current virtual time, as defined in the WFQ algorithm. VirtualTime virtual_time_ = VirtualTime::Zero(); // The current stream to send chunks from. Stream* current_stream_ = nullptr; + bool enable_message_interleaving_ = false; + // Indicates if the streams is currently sending a message, and should then - // continue sending from this stream until that message has been sent in full. + // - if message interleaving is not enabled - continue sending from this + // stream until that message has been sent in full. bool currently_sending_a_message_ = false; // The currently active streams, ordered by virtual finish time. diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index 0c239fe8b0..58f0bc4690 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -11,6 +11,7 @@ #include +#include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/public/types.h" #include "test/gmock.h" @@ -91,14 +92,14 @@ class TestStream { // A scheduler without active streams doesn't produce data. TEST(StreamSchedulerTest, HasNoActiveStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); } // Stream properties can be set and retrieved TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer; auto stream = @@ -107,13 +108,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { EXPECT_EQ(stream->stream_id(), StreamID(1)); EXPECT_EQ(stream->priority(), StreamPriority(2)); - stream->set_priority(StreamPriority(0)); + stream->SetPriority(StreamPriority(0)); EXPECT_EQ(stream->priority(), StreamPriority(0)); } // A scheduler with a single stream produced packets from it. TEST(StreamSchedulerTest, CanProduceFromSingleStream) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer; EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); @@ -130,7 +131,7 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { // Switches between two streams after every packet. TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -172,7 +173,7 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { // Switches between two streams after every packet, but keeps producing from the // same stream when a packet contains of multiple fragments. TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -234,7 +235,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { // Deactivates a stream before it has finished producing all packets. TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -258,7 +259,7 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { // Resumes a paused stream - makes a stream active after inactivating it. TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; // Callbacks are setup so that they hint that there is a MID(2) coming... @@ -288,7 +289,7 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { // Iterates between streams, where one is suddenly paused and later resumed. TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -332,7 +333,7 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { // Verifies that packet counts are evenly distributed in round robin scheduling. TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); @@ -345,7 +346,7 @@ TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { // where a stream is suddenly made inactive, two are added, and then the paused // stream is resumed. TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); @@ -373,5 +374,367 @@ TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { EXPECT_EQ(counts3[StreamID(4)], 5U); } +// Degrades to fair queuing with streams having identical priority. +TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 30; + constexpr size_t kLargePacket = 70; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kLargePacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + // t = 30 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t = 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t = 70 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t = 90 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + // t = 140 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t = 210 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units. + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units. + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + StrictMock callback3; + EXPECT_CALL(callback3, Produce) + .WillOnce(CreateChunk(StreamID(3), MID(300))) + .WillOnce(CreateChunk(StreamID(3), MID(301))) + .WillOnce(CreateChunk(StreamID(3), MID(302))); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units. + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 20 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 40 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 50 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 80 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 100 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 150 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 160 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 240 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority +// and sending different payload sizes. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 20; + constexpr size_t kMediumPacket = 50; + constexpr size_t kLargePacket = 70; + + // Stream with priority = 125 -> inverse weight ~=80 + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + // virtual finish time ~ 0 + 50 * 80 = 4000 + .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket)) + // virtual finish time ~ 4000 + 20 * 80 = 5600 + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + // virtual finish time ~ 5600 + 70 * 80 = 11200 + .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + // Stream with priority = 200 -> inverse weight ~=50 + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + // virtual finish time ~ 0 + 50 * 50 = 2500 + .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket)) + // virtual finish time ~ 2500 + 70 * 50 = 6000 + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + // virtual finish time ~ 6000 + 20 * 50 = 7000 + .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + // Stream with priority = 500 -> inverse weight ~=20 + StrictMock callback3; + EXPECT_CALL(callback3, Produce) + // virtual finish time ~ 0 + 20 * 20 = 400 + .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket)) + // virtual finish time ~ 400 + 50 * 20 = 1400 + .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket)) + // virtual finish time ~ 1400 + 70 * 20 = 2800 + .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket)); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kMediumPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 1400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 2500 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 2800 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 4000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 5600 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 6000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 7000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 11200 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { + // A simple test with two streams of different priority, but sending packets + // of identical size. Verifies that the ratio of sent packets represent their + // priority. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + + std::map packet_counts = GetPacketCounts(scheduler, 15); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); +} + +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) { + // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more + // streams. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize); + + std::map packet_counts = GetPacketCounts(scheduler, 50); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) { + // A simple test with two streams of different priority, but sending packets + // of different size. Verifies that the ratio of total packet payload + // represent their priority. + // In this example, + // * stream1 has priority 100 and sends packets of size 8 + // * stream2 has priority 400 and sends packets of size 4 + // With round robin, stream1 would get twice as many payload bytes on the wire + // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as + // many payload bytes on the wire. That translates to stream2 getting 8x as + // many packets on the wire as they are half as large. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/8); + TestStream stream2(scheduler, StreamID(2), StreamPriority(400), + /*packet_size=*/4); + + std::map packet_counts = GetPacketCounts(scheduler, 90); + EXPECT_EQ(packet_counts[StreamID(1)], 10U); + EXPECT_EQ(packet_counts[StreamID(2)], 80U); +} + +TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) { + // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more + // complicated. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/10); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), + /*packet_size=*/10); + TestStream stream3(scheduler, StreamID(3), StreamPriority(200), + /*packet_size=*/20); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), + /*packet_size=*/30); + + std::map packet_counts = GetPacketCounts(scheduler, 80); + // 15 packets * 10 bytes = 150 bytes at priority 100. + EXPECT_EQ(packet_counts[StreamID(1)], 15U); + // 30 packets * 10 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(2)], 30U); + // 15 packets * 20 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + // 20 packets * 30 bytes = 600 bytes at priority 400. + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +// Sending large messages with small MTU will fragment the messages and produce +// a first fragment not larger than the MTU, and will then not first send from +// the stream with the smallest message, as their first fragment will be equally +// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but +// with a larger MTU. +TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { + StreamScheduler scheduler(100 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(100)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 100)) + .WillOnce(CreateChunk(StreamID(2), MID(1), 50)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(50)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Sending large messages with large MTU will not fragment messages and will +// send the message first from the stream that has the smallest message. +TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { + StreamScheduler scheduler(200 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 200)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 150)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + } // namespace } // namespace dcsctp