diff --git a/net/dcsctp/tx/BUILD.gn b/net/dcsctp/tx/BUILD.gn index ae81db8c67..e691b76138 100644 --- a/net/dcsctp/tx/BUILD.gn +++ b/net/dcsctp/tx/BUILD.gn @@ -55,7 +55,9 @@ rtc_library("stream_scheduler") { "../../../rtc_base:strong_alias", "../../../rtc_base/containers:flat_set", "../common:str_join", + "../packet:chunk", "../packet:data", + "../packet:sctp_packet", "../public:socket", "../public:types", ] diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index ee8bf828d0..bec6f08def 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -39,6 +39,8 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix, : log_prefix_(std::string(log_prefix) + "fcfs: "), buffer_size_(buffer_size), default_priority_(default_priority), + // TODO(webrtc:5696): Provide correct MTU. + scheduler_(DcSctpOptions::kMaxSafeMTUSize), on_buffered_amount_low_(std::move(on_buffered_amount_low)), total_buffered_amount_(std::move(on_total_buffered_amount_low)) { total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); @@ -482,7 +484,7 @@ void RRSendQueue::SetStreamPriority(StreamID stream_id, StreamPriority priority) { OutgoingStream& stream = GetOrCreateStreamInfo(stream_id); - stream.set_priority(priority); + stream.SetPriority(priority); RTC_DCHECK(IsConsistent()); } diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index eea814c310..c2f1ee8e73 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -171,8 +171,8 @@ class RRSendQueue : public SendQueue { bool has_partially_sent_message() const; StreamPriority priority() const { return scheduler_stream_->priority(); } - void set_priority(StreamPriority priority) { - scheduler_stream_->set_priority(priority); + void SetPriority(StreamPriority priority) { + scheduler_stream_->SetPriority(priority); } void AddHandoverState( diff --git a/net/dcsctp/tx/stream_scheduler.cc b/net/dcsctp/tx/stream_scheduler.cc index 1d15aec4da..d1560a75e4 100644 --- a/net/dcsctp/tx/stream_scheduler.cc +++ b/net/dcsctp/tx/stream_scheduler.cc @@ -9,6 +9,8 @@ */ #include "net/dcsctp/tx/stream_scheduler.h" +#include + #include "absl/algorithm/container.h" #include "absl/types/optional.h" #include "api/array_view.h" @@ -23,14 +25,19 @@ namespace dcsctp { -void StreamScheduler::Stream::set_priority(StreamPriority priority) { +void StreamScheduler::Stream::SetPriority(StreamPriority priority) { priority_ = priority; + inverse_weight_ = InverseWeight(priority); } absl::optional StreamScheduler::Produce( TimeMs now, size_t max_size) { - bool rescheduling = !currently_sending_a_message_; + // For non-interleaved streams, avoid rescheduling while still sending a + // message as it needs to be sent in full. For interleaved messaging, + // reschedule for every I-DATA chunk sent. + bool rescheduling = + enable_message_interleaving_ || !currently_sending_a_message_; RTC_LOG(LS_VERBOSE) << "Producing data, rescheduling=" << rescheduling << ", active=" @@ -92,7 +99,7 @@ absl::optional StreamScheduler::Produce( // in `active_streams`. size_t bytes_to_send_next = current_stream_->bytes_to_send_in_next_message(); if (rescheduling && bytes_to_send_next > 0) { - current_stream_->MakeActive(); + current_stream_->MakeActive(bytes_to_send_next); } else if (!rescheduling && bytes_to_send_next == 0) { current_stream_->MakeInactive(); } @@ -101,13 +108,19 @@ absl::optional StreamScheduler::Produce( return data; } -StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime() - const { - // Implement round-robin by letting the stream have its next virtual finish - // time in the future. It doesn't matter how far into the future, just any - // positive number so that any other stream that has the same virtual finish - // time as this stream gets to produce their data before revisiting this - // stream. +StreamScheduler::VirtualTime StreamScheduler::Stream::CalculateFinishTime( + size_t bytes_to_send_next) const { + if (parent_.enable_message_interleaving_) { + // Perform weighted fair queuing scheduling. + return VirtualTime(*current_virtual_time_ + + bytes_to_send_next * *inverse_weight_); + } + + // Perform round-robin scheduling by letting the stream have its next virtual + // finish time in the future. It doesn't matter how far into the future, just + // any positive number so that any other stream that has the same virtual + // finish time as this stream gets to produce their data before revisiting + // this stream. return VirtualTime(*current_virtual_time_ + 1); } @@ -117,7 +130,7 @@ absl::optional StreamScheduler::Stream::Produce( absl::optional data = producer_.Produce(now, max_size); if (data.has_value()) { - VirtualTime new_current = GetNextFinishTime(); + VirtualTime new_current = CalculateFinishTime(data->data.payload.size()); RTC_DLOG(LS_VERBOSE) << "Virtual time changed: " << *current_virtual_time_ << " -> " << *new_current; current_virtual_time_ = new_current; @@ -140,19 +153,22 @@ bool StreamScheduler::IsConsistent() const { void StreamScheduler::Stream::MaybeMakeActive() { RTC_DLOG(LS_VERBOSE) << "MaybeMakeActive(" << *stream_id() << ")"; RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); - if (bytes_to_send_in_next_message() == 0) { + size_t bytes_to_send_next = bytes_to_send_in_next_message(); + if (bytes_to_send_next == 0) { return; } - MakeActive(); + MakeActive(bytes_to_send_next); } -void StreamScheduler::Stream::MakeActive() { +void StreamScheduler::Stream::MakeActive(size_t bytes_to_send_next) { current_virtual_time_ = parent_.virtual_time_; - VirtualTime next_finish_time = GetNextFinishTime(); + RTC_DCHECK_GT(bytes_to_send_next, 0); + VirtualTime next_finish_time = CalculateFinishTime( + std::min(bytes_to_send_next, parent_.max_payload_bytes_)); + RTC_DCHECK_GT(*next_finish_time, 0); RTC_DLOG(LS_VERBOSE) << "Making stream " << *stream_id() << " active, expiring at " << *next_finish_time; - RTC_DCHECK_GT(*next_finish_time, 0); RTC_DCHECK(next_finish_time_ == VirtualTime::Zero()); next_finish_time_ = next_finish_time; RTC_DCHECK(!absl::c_any_of(parent_.active_streams_, diff --git a/net/dcsctp/tx/stream_scheduler.h b/net/dcsctp/tx/stream_scheduler.h index e76f474248..9c523edbfc 100644 --- a/net/dcsctp/tx/stream_scheduler.h +++ b/net/dcsctp/tx/stream_scheduler.h @@ -10,6 +10,7 @@ #ifndef NET_DCSCTP_TX_STREAM_SCHEDULER_H_ #define NET_DCSCTP_TX_STREAM_SCHEDULER_H_ +#include #include #include #include @@ -24,6 +25,8 @@ #include "absl/strings/string_view.h" #include "absl/types/optional.h" #include "api/array_view.h" +#include "net/dcsctp/packet/chunk/idata_chunk.h" +#include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/public/dcsctp_message.h" #include "net/dcsctp/public/dcsctp_socket.h" #include "net/dcsctp/public/types.h" @@ -42,9 +45,21 @@ namespace dcsctp { // with a "virtual finish time", which is the time when a stream is allowed to // produce data. Streams are ordered by their virtual finish time, and the // "current virtual time" will advance to the next following virtual finish time -// whenever a chunk is to be produced. In the initial round-robin scheduling -// algorithm, a stream's virtual finish time will just increment by one (1) -// after having produced a chunk, which results in a round-robin scheduling. +// whenever a chunk is to be produced. +// +// When message interleaving is enabled, the WFQ - Weighted Fair Queueing - +// scheduling algorithm will be used. And when it's not, round-robin scheduling +// will be used instead. +// +// In the round robin scheduling algorithm, a stream's virtual finish time will +// just increment by one (1) after having produced a chunk, which results in a +// round-robin scheduling. +// +// In WFQ scheduling algorithm, a stream's virtual finish time will be defined +// as the number of bytes in the next fragment to be sent, multiplied by the +// inverse of the stream's priority, meaning that a high priority - or a smaller +// fragment - results in a closer virtual finish time, compared to a stream with +// either a lower priority or a larger fragment to be sent. class StreamScheduler { private: class VirtualTime : public webrtc::StrongAlias { @@ -54,6 +69,13 @@ class StreamScheduler { static constexpr VirtualTime Zero() { return VirtualTime(0); } }; + class InverseWeight + : public webrtc::StrongAlias { + public: + constexpr explicit InverseWeight(StreamPriority priority) + : webrtc::StrongAlias( + 1.0 / std::max(static_cast(*priority), 0.000001)) {} + }; public: class StreamProducer { @@ -79,7 +101,7 @@ class StreamScheduler { StreamID stream_id() const { return stream_id_; } StreamPriority priority() const { return priority_; } - void set_priority(StreamPriority priority); + void SetPriority(StreamPriority priority); // Will activate the stream _if_ it has any data to send. That is, if the // callback to `bytes_to_send_in_next_message` returns non-zero. If the @@ -105,13 +127,14 @@ class StreamScheduler { : parent_(*parent), producer_(*producer), stream_id_(stream_id), - priority_(priority) {} + priority_(priority), + inverse_weight_(priority) {} // Produces a message from this stream. This will only be called on streams // that have data. absl::optional Produce(TimeMs now, size_t max_size); - void MakeActive(); + void MakeActive(size_t bytes_to_send_next); void ForceMarkInactive(); VirtualTime current_time() const { return current_virtual_time_; } @@ -120,24 +143,34 @@ class StreamScheduler { return producer_.bytes_to_send_in_next_message(); } - // Returns the next virtual finish time for this stream. - VirtualTime GetNextFinishTime() const; + VirtualTime CalculateFinishTime(size_t bytes_to_send_next) const; StreamScheduler& parent_; StreamProducer& producer_; const StreamID stream_id_; StreamPriority priority_; + InverseWeight inverse_weight_; // This outgoing stream's "current" virtual_time. VirtualTime current_virtual_time_ = VirtualTime::Zero(); VirtualTime next_finish_time_ = VirtualTime::Zero(); }; + // The `mtu` parameter represents the maximum SCTP packet size, which should + // be the same as `DcSctpOptions::mtu`. + explicit StreamScheduler(size_t mtu) + : max_payload_bytes_(mtu - SctpPacket::kHeaderSize - + IDataChunk::kHeaderSize) {} + std::unique_ptr CreateStream(StreamProducer* producer, StreamID stream_id, StreamPriority priority) { return absl::WrapUnique(new Stream(this, producer, stream_id, priority)); } + void EnableMessageInterleaving(bool enabled) { + enable_message_interleaving_ = enabled; + } + // Makes the scheduler stop producing message from the current stream and // re-evaluates which stream to produce from. void ForceReschedule() { currently_sending_a_message_ = false; } @@ -165,14 +198,19 @@ class StreamScheduler { bool IsConsistent() const; + const size_t max_payload_bytes_; + // The current virtual time, as defined in the WFQ algorithm. VirtualTime virtual_time_ = VirtualTime::Zero(); // The current stream to send chunks from. Stream* current_stream_ = nullptr; + bool enable_message_interleaving_ = false; + // Indicates if the streams is currently sending a message, and should then - // continue sending from this stream until that message has been sent in full. + // - if message interleaving is not enabled - continue sending from this + // stream until that message has been sent in full. bool currently_sending_a_message_ = false; // The currently active streams, ordered by virtual finish time. diff --git a/net/dcsctp/tx/stream_scheduler_test.cc b/net/dcsctp/tx/stream_scheduler_test.cc index 0c239fe8b0..58f0bc4690 100644 --- a/net/dcsctp/tx/stream_scheduler_test.cc +++ b/net/dcsctp/tx/stream_scheduler_test.cc @@ -11,6 +11,7 @@ #include +#include "net/dcsctp/packet/sctp_packet.h" #include "net/dcsctp/public/types.h" #include "test/gmock.h" @@ -91,14 +92,14 @@ class TestStream { // A scheduler without active streams doesn't produce data. TEST(StreamSchedulerTest, HasNoActiveStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); } // Stream properties can be set and retrieved TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer; auto stream = @@ -107,13 +108,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) { EXPECT_EQ(stream->stream_id(), StreamID(1)); EXPECT_EQ(stream->priority(), StreamPriority(2)); - stream->set_priority(StreamPriority(0)); + stream->SetPriority(StreamPriority(0)); EXPECT_EQ(stream->priority(), StreamPriority(0)); } // A scheduler with a single stream produced packets from it. TEST(StreamSchedulerTest, CanProduceFromSingleStream) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer; EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0))); @@ -130,7 +131,7 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) { // Switches between two streams after every packet. TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -172,7 +173,7 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) { // Switches between two streams after every packet, but keeps producing from the // same stream when a packet contains of multiple fragments. TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -234,7 +235,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) { // Deactivates a stream before it has finished producing all packets. TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -258,7 +259,7 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) { // Resumes a paused stream - makes a stream active after inactivating it. TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; // Callbacks are setup so that they hint that there is a MID(2) coming... @@ -288,7 +289,7 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) { // Iterates between streams, where one is suddenly paused and later resumed. TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); StrictMock producer1; EXPECT_CALL(producer1, Produce) @@ -332,7 +333,7 @@ TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) { // Verifies that packet counts are evenly distributed in round robin scheduling. TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); @@ -345,7 +346,7 @@ TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) { // where a stream is suddenly made inactive, two are added, and then the paused // stream is resumed. TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { - StreamScheduler scheduler; + StreamScheduler scheduler(kMtu); TestStream stream1(scheduler, StreamID(1), StreamPriority(1)); TestStream stream2(scheduler, StreamID(2), StreamPriority(1)); @@ -373,5 +374,367 @@ TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) { EXPECT_EQ(counts3[StreamID(4)], 5U); } +// Degrades to fair queuing with streams having identical priority. +TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 30; + constexpr size_t kLargePacket = 70; + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + .WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + .WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kLargePacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2)); + stream2->MaybeMakeActive(); + + // t = 30 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t = 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t = 70 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t = 90 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + // t = 140 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t = 210 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(100))) + .WillOnce(CreateChunk(StreamID(1), MID(101))) + .WillOnce(CreateChunk(StreamID(1), MID(102))); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 125 -> allowed to produce every 1000/125 ~= 80 time units. + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(200))) + .WillOnce(CreateChunk(StreamID(2), MID(201))) + .WillOnce(CreateChunk(StreamID(2), MID(202))); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 200 -> allowed to produce every 1000/200 ~= 50 time units. + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + StrictMock callback3; + EXPECT_CALL(callback3, Produce) + .WillOnce(CreateChunk(StreamID(3), MID(300))) + .WillOnce(CreateChunk(StreamID(3), MID(301))) + .WillOnce(CreateChunk(StreamID(3), MID(302))); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kPayloadSize)) // When making active + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(kPayloadSize)) + .WillOnce(Return(0)); + // Priority 500 -> allowed to produce every 1000/500 ~= 20 time units. + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 20 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 40 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 50 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 60 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 80 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 100 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 150 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 160 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 240 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Will do weighted fair queuing with three streams having different priority +// and sending different payload sizes. +TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) { + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + constexpr size_t kSmallPacket = 20; + constexpr size_t kMediumPacket = 50; + constexpr size_t kLargePacket = 70; + + // Stream with priority = 125 -> inverse weight ~=80 + StrictMock callback1; + EXPECT_CALL(callback1, Produce) + // virtual finish time ~ 0 + 50 * 80 = 4000 + .WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket)) + // virtual finish time ~ 4000 + 20 * 80 = 5600 + .WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket)) + // virtual finish time ~ 5600 + 70 * 80 = 11200 + .WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket)); + EXPECT_CALL(callback1, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125)); + stream1->MaybeMakeActive(); + + // Stream with priority = 200 -> inverse weight ~=50 + StrictMock callback2; + EXPECT_CALL(callback2, Produce) + // virtual finish time ~ 0 + 50 * 50 = 2500 + .WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket)) + // virtual finish time ~ 2500 + 70 * 50 = 6000 + .WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket)) + // virtual finish time ~ 6000 + 20 * 50 = 7000 + .WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket)); + EXPECT_CALL(callback2, bytes_to_send_in_next_message) + .WillOnce(Return(kMediumPacket)) // When making active + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(kSmallPacket)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200)); + stream2->MaybeMakeActive(); + + // Stream with priority = 500 -> inverse weight ~=20 + StrictMock callback3; + EXPECT_CALL(callback3, Produce) + // virtual finish time ~ 0 + 20 * 20 = 400 + .WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket)) + // virtual finish time ~ 400 + 50 * 20 = 1400 + .WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket)) + // virtual finish time ~ 1400 + 70 * 20 = 2800 + .WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket)); + EXPECT_CALL(callback3, bytes_to_send_in_next_message) + .WillOnce(Return(kSmallPacket)) // When making active + .WillOnce(Return(kMediumPacket)) + .WillOnce(Return(kLargePacket)) + .WillOnce(Return(0)); + auto stream3 = + scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500)); + stream3->MaybeMakeActive(); + + // t ~= 400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300))); + // t ~= 1400 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301))); + // t ~= 2500 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200))); + // t ~= 2800 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302))); + // t ~= 4000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100))); + // t ~= 5600 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101))); + // t ~= 6000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201))); + // t ~= 7000 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202))); + // t ~= 11200 + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) { + // A simple test with two streams of different priority, but sending packets + // of identical size. Verifies that the ratio of sent packets represent their + // priority. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + + std::map packet_counts = GetPacketCounts(scheduler, 15); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); +} + +TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) { + // Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more + // streams. + StreamScheduler scheduler(kMtu); + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize); + TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize); + + std::map packet_counts = GetPacketCounts(scheduler, 50); + EXPECT_EQ(packet_counts[StreamID(1)], 5U); + EXPECT_EQ(packet_counts[StreamID(2)], 10U); + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) { + // A simple test with two streams of different priority, but sending packets + // of different size. Verifies that the ratio of total packet payload + // represent their priority. + // In this example, + // * stream1 has priority 100 and sends packets of size 8 + // * stream2 has priority 400 and sends packets of size 4 + // With round robin, stream1 would get twice as many payload bytes on the wire + // as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as + // many payload bytes on the wire. That translates to stream2 getting 8x as + // many packets on the wire as they are half as large. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/8); + TestStream stream2(scheduler, StreamID(2), StreamPriority(400), + /*packet_size=*/4); + + std::map packet_counts = GetPacketCounts(scheduler, 90); + EXPECT_EQ(packet_counts[StreamID(1)], 10U); + EXPECT_EQ(packet_counts[StreamID(2)], 80U); +} + +TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) { + // Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more + // complicated. + StreamScheduler scheduler(kMtu); + // Enable WFQ scheduler. + scheduler.EnableMessageInterleaving(true); + + TestStream stream1(scheduler, StreamID(1), StreamPriority(100), + /*packet_size=*/10); + TestStream stream2(scheduler, StreamID(2), StreamPriority(200), + /*packet_size=*/10); + TestStream stream3(scheduler, StreamID(3), StreamPriority(200), + /*packet_size=*/20); + TestStream stream4(scheduler, StreamID(4), StreamPriority(400), + /*packet_size=*/30); + + std::map packet_counts = GetPacketCounts(scheduler, 80); + // 15 packets * 10 bytes = 150 bytes at priority 100. + EXPECT_EQ(packet_counts[StreamID(1)], 15U); + // 30 packets * 10 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(2)], 30U); + // 15 packets * 20 bytes = 300 bytes at priority 200. + EXPECT_EQ(packet_counts[StreamID(3)], 15U); + // 20 packets * 30 bytes = 600 bytes at priority 400. + EXPECT_EQ(packet_counts[StreamID(4)], 20U); +} + +// Sending large messages with small MTU will fragment the messages and produce +// a first fragment not larger than the MTU, and will then not first send from +// the stream with the smallest message, as their first fragment will be equally +// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but +// with a larger MTU. +TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) { + StreamScheduler scheduler(100 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)) + .WillOnce(CreateChunk(StreamID(1), MID(0), 100)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(100)) + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 100)) + .WillOnce(CreateChunk(StreamID(2), MID(1), 50)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(50)) + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + +// Sending large messages with large MTU will not fragment messages and will +// send the message first from the stream that has the smallest message. +TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) { + StreamScheduler scheduler(200 + SctpPacket::kHeaderSize + + IDataChunk::kHeaderSize); + scheduler.EnableMessageInterleaving(true); + + StrictMock producer1; + EXPECT_CALL(producer1, Produce) + .WillOnce(CreateChunk(StreamID(1), MID(0), 200)); + EXPECT_CALL(producer1, bytes_to_send_in_next_message) + .WillOnce(Return(200)) // When making active + .WillOnce(Return(0)); + auto stream1 = + scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1)); + stream1->MaybeMakeActive(); + + StrictMock producer2; + EXPECT_CALL(producer2, Produce) + .WillOnce(CreateChunk(StreamID(2), MID(1), 150)); + EXPECT_CALL(producer2, bytes_to_send_in_next_message) + .WillOnce(Return(150)) // When making active + .WillOnce(Return(0)); + auto stream2 = + scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1)); + stream2->MaybeMakeActive(); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1))); + EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0))); + EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt); +} + } // namespace } // namespace dcsctp