
This mainly modifies the stream scheduler to add a weighted fair queuing algorithm in addition to its round robin algorithm. The WFQ algorithm is selected whenever interleaving is enabled, to ensure that the socket stays backwards compatible in the normal (non-interleaved) scenario. Adaptation to send queue and socket comes in a follow-up CL. Bug: webrtc:5696 Change-Id: I8f0dbfa8c2f40f2e84cee536ea821e7ef4af6310 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261947 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37330}
741 lines
30 KiB
C++
741 lines
30 KiB
C++
/*
|
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
|
*
|
|
* Use of this source code is governed by a BSD-style license
|
|
* that can be found in the LICENSE file in the root of the source
|
|
* tree. An additional intellectual property rights grant can be found
|
|
* in the file PATENTS. All contributing project authors may
|
|
* be found in the AUTHORS file in the root of the source tree.
|
|
*/
|
|
#include "net/dcsctp/tx/stream_scheduler.h"
|
|
|
|
#include <vector>
|
|
|
|
#include "net/dcsctp/packet/sctp_packet.h"
|
|
#include "net/dcsctp/public/types.h"
|
|
#include "test/gmock.h"
|
|
|
|
namespace dcsctp {
|
|
namespace {
|
|
using ::testing::Return;
|
|
using ::testing::StrictMock;
|
|
|
|
constexpr size_t kMtu = 1000;
|
|
constexpr size_t kPayloadSize = 4;
|
|
|
|
MATCHER_P(HasDataWithMid, mid, "") {
|
|
if (!arg.has_value()) {
|
|
*result_listener << "There was no produced data";
|
|
return false;
|
|
}
|
|
|
|
if (arg->data.message_id != mid) {
|
|
*result_listener << "the produced data had mid " << *arg->data.message_id
|
|
<< " and not the expected " << *mid;
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
std::function<absl::optional<SendQueue::DataToSend>(TimeMs, size_t)>
|
|
CreateChunk(StreamID sid, MID mid, size_t payload_size = kPayloadSize) {
|
|
return [sid, mid, payload_size](TimeMs now, size_t max_size) {
|
|
return SendQueue::DataToSend(Data(
|
|
sid, SSN(0), mid, FSN(0), PPID(42), std::vector<uint8_t>(payload_size),
|
|
Data::IsBeginning(true), Data::IsEnd(true), IsUnordered(true)));
|
|
};
|
|
}
|
|
|
|
std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
|
|
size_t packets_to_generate) {
|
|
std::map<StreamID, size_t> packet_counts;
|
|
for (size_t i = 0; i < packets_to_generate; ++i) {
|
|
absl::optional<SendQueue::DataToSend> data =
|
|
scheduler.Produce(TimeMs(0), kMtu);
|
|
if (data.has_value()) {
|
|
++packet_counts[data->data.stream_id];
|
|
}
|
|
}
|
|
return packet_counts;
|
|
}
|
|
|
|
class MockStreamProducer : public StreamScheduler::StreamProducer {
|
|
public:
|
|
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
|
|
Produce,
|
|
(TimeMs, size_t),
|
|
(override));
|
|
MOCK_METHOD(size_t, bytes_to_send_in_next_message, (), (const, override));
|
|
};
|
|
|
|
class TestStream {
|
|
public:
|
|
TestStream(StreamScheduler& scheduler,
|
|
StreamID stream_id,
|
|
StreamPriority priority,
|
|
size_t packet_size = kPayloadSize) {
|
|
EXPECT_CALL(producer_, Produce)
|
|
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
|
|
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
|
|
.WillRepeatedly(Return(packet_size));
|
|
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
|
|
stream_->MaybeMakeActive();
|
|
}
|
|
|
|
StreamScheduler::Stream& stream() { return *stream_; }
|
|
|
|
private:
|
|
StrictMock<MockStreamProducer> producer_;
|
|
std::unique_ptr<StreamScheduler::Stream> stream_;
|
|
};
|
|
|
|
// A scheduler without active streams doesn't produce data.
|
|
TEST(StreamSchedulerTest, HasNoActiveStreams) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Stream properties can be set and retrieved
|
|
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer;
|
|
auto stream =
|
|
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
|
|
|
EXPECT_EQ(stream->stream_id(), StreamID(1));
|
|
EXPECT_EQ(stream->priority(), StreamPriority(2));
|
|
|
|
stream->SetPriority(StreamPriority(0));
|
|
EXPECT_EQ(stream->priority(), StreamPriority(0));
|
|
}
|
|
|
|
// A scheduler with a single stream produced packets from it.
|
|
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer;
|
|
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
|
|
EXPECT_CALL(producer, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream =
|
|
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
|
stream->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Switches between two streams after every packet.
|
|
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Switches between two streams after every packet, but keeps producing from the
|
|
// same stream when a packet contains of multiple fragments.
|
|
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(true),
|
|
Data::IsEnd(false), IsUnordered(true)));
|
|
})
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(false),
|
|
Data::IsEnd(false), IsUnordered(true)));
|
|
})
|
|
.WillOnce([](...) {
|
|
return SendQueue::DataToSend(
|
|
Data(StreamID(1), SSN(0), MID(101), FSN(0), PPID(42),
|
|
std::vector<uint8_t>(4), Data::IsBeginning(false),
|
|
Data::IsEnd(true), IsUnordered(true)));
|
|
})
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Deactivates a stream before it has finished producing all packets.
|
|
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
|
|
// ... but the stream is made inactive before it can be produced.
|
|
stream1->MakeInactive();
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Resumes a paused stream - makes a stream active after inactivating it.
|
|
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
// Callbacks are setup so that they hint that there is a MID(2) coming...
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)) // When making active again
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
|
|
stream1->MakeInactive();
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
stream1->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Iterates between streams, where one is suddenly paused and later resumed.
|
|
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
|
|
StreamScheduler scheduler(kMtu);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
stream1->MakeInactive();
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
stream1->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Verifies that packet counts are evenly distributed in round robin scheduling.
|
|
TEST(StreamSchedulerTest, WillDistributeRoundRobinPacketsEvenlyTwoStreams) {
|
|
StreamScheduler scheduler(kMtu);
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
|
|
}
|
|
|
|
// Verifies that packet counts are evenly distributed among active streams,
|
|
// where a stream is suddenly made inactive, two are added, and then the paused
|
|
// stream is resumed.
|
|
TEST(StreamSchedulerTest, WillDistributeEvenlyWithPausedAndAddedStreams) {
|
|
StreamScheduler scheduler(kMtu);
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(1));
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 10);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 5U);
|
|
|
|
stream2.stream().MakeInactive();
|
|
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(1));
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(1));
|
|
|
|
std::map<StreamID, size_t> counts2 = GetPacketCounts(scheduler, 15);
|
|
EXPECT_EQ(counts2[StreamID(1)], 5U);
|
|
EXPECT_EQ(counts2[StreamID(2)], 0U);
|
|
EXPECT_EQ(counts2[StreamID(3)], 5U);
|
|
EXPECT_EQ(counts2[StreamID(4)], 5U);
|
|
|
|
stream2.stream().MaybeMakeActive();
|
|
|
|
std::map<StreamID, size_t> counts3 = GetPacketCounts(scheduler, 20);
|
|
EXPECT_EQ(counts3[StreamID(1)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(2)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(3)], 5U);
|
|
EXPECT_EQ(counts3[StreamID(4)], 5U);
|
|
}
|
|
|
|
// Degrades to fair queuing with streams having identical priority.
|
|
TEST(StreamSchedulerTest, WillDoFairQueuingWithSamePriority) {
|
|
StreamScheduler scheduler(kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
constexpr size_t kSmallPacket = 30;
|
|
constexpr size_t kLargePacket = 70;
|
|
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100), kSmallPacket))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102), kSmallPacket));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kSmallPacket)) // When making active
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200), kLargePacket))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202), kLargePacket));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kLargePacket)) // When making active
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
|
stream2->MaybeMakeActive();
|
|
|
|
// t = 30
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
// t = 60
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
// t = 70
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
// t = 90
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
// t = 140
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
// t = 210
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Will do weighted fair queuing with three streams having different priority.
|
|
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingSameSizeDifferentPriority) {
|
|
StreamScheduler scheduler(kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 125 -> allowed to produce every 1000/125 ~= 80 time units.
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 200 -> allowed to produce every 1000/200 ~= 50 time units.
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
|
|
stream2->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> callback3;
|
|
EXPECT_CALL(callback3, Produce)
|
|
.WillOnce(CreateChunk(StreamID(3), MID(300)))
|
|
.WillOnce(CreateChunk(StreamID(3), MID(301)))
|
|
.WillOnce(CreateChunk(StreamID(3), MID(302)));
|
|
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kPayloadSize)) // When making active
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(kPayloadSize))
|
|
.WillOnce(Return(0));
|
|
// Priority 500 -> allowed to produce every 1000/500 ~= 20 time units.
|
|
auto stream3 =
|
|
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
|
|
stream3->MaybeMakeActive();
|
|
|
|
// t ~= 20
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
|
|
// t ~= 40
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
|
|
// t ~= 50
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
// t ~= 60
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
|
|
// t ~= 80
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
// t ~= 100
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
// t ~= 150
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
// t ~= 160
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
// t ~= 240
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Will do weighted fair queuing with three streams having different priority
|
|
// and sending different payload sizes.
|
|
TEST(StreamSchedulerTest, WillDoWeightedFairQueuingDifferentSizeAndPriority) {
|
|
StreamScheduler scheduler(kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
constexpr size_t kSmallPacket = 20;
|
|
constexpr size_t kMediumPacket = 50;
|
|
constexpr size_t kLargePacket = 70;
|
|
|
|
// Stream with priority = 125 -> inverse weight ~=80
|
|
StrictMock<MockStreamProducer> callback1;
|
|
EXPECT_CALL(callback1, Produce)
|
|
// virtual finish time ~ 0 + 50 * 80 = 4000
|
|
.WillOnce(CreateChunk(StreamID(1), MID(100), kMediumPacket))
|
|
// virtual finish time ~ 4000 + 20 * 80 = 5600
|
|
.WillOnce(CreateChunk(StreamID(1), MID(101), kSmallPacket))
|
|
// virtual finish time ~ 5600 + 70 * 80 = 11200
|
|
.WillOnce(CreateChunk(StreamID(1), MID(102), kLargePacket));
|
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kMediumPacket)) // When making active
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(125));
|
|
stream1->MaybeMakeActive();
|
|
|
|
// Stream with priority = 200 -> inverse weight ~=50
|
|
StrictMock<MockStreamProducer> callback2;
|
|
EXPECT_CALL(callback2, Produce)
|
|
// virtual finish time ~ 0 + 50 * 50 = 2500
|
|
.WillOnce(CreateChunk(StreamID(2), MID(200), kMediumPacket))
|
|
// virtual finish time ~ 2500 + 70 * 50 = 6000
|
|
.WillOnce(CreateChunk(StreamID(2), MID(201), kLargePacket))
|
|
// virtual finish time ~ 6000 + 20 * 50 = 7000
|
|
.WillOnce(CreateChunk(StreamID(2), MID(202), kSmallPacket));
|
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kMediumPacket)) // When making active
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(kSmallPacket))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(200));
|
|
stream2->MaybeMakeActive();
|
|
|
|
// Stream with priority = 500 -> inverse weight ~=20
|
|
StrictMock<MockStreamProducer> callback3;
|
|
EXPECT_CALL(callback3, Produce)
|
|
// virtual finish time ~ 0 + 20 * 20 = 400
|
|
.WillOnce(CreateChunk(StreamID(3), MID(300), kSmallPacket))
|
|
// virtual finish time ~ 400 + 50 * 20 = 1400
|
|
.WillOnce(CreateChunk(StreamID(3), MID(301), kMediumPacket))
|
|
// virtual finish time ~ 1400 + 70 * 20 = 2800
|
|
.WillOnce(CreateChunk(StreamID(3), MID(302), kLargePacket));
|
|
EXPECT_CALL(callback3, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(kSmallPacket)) // When making active
|
|
.WillOnce(Return(kMediumPacket))
|
|
.WillOnce(Return(kLargePacket))
|
|
.WillOnce(Return(0));
|
|
auto stream3 =
|
|
scheduler.CreateStream(&callback3, StreamID(3), StreamPriority(500));
|
|
stream3->MaybeMakeActive();
|
|
|
|
// t ~= 400
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(300)));
|
|
// t ~= 1400
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(301)));
|
|
// t ~= 2500
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(200)));
|
|
// t ~= 2800
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(302)));
|
|
// t ~= 4000
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
|
// t ~= 5600
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(101)));
|
|
// t ~= 6000
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(201)));
|
|
// t ~= 7000
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(202)));
|
|
// t ~= 11200
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(102)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInTwoStreamsByPriority) {
|
|
// A simple test with two streams of different priority, but sending packets
|
|
// of identical size. Verifies that the ratio of sent packets represent their
|
|
// priority.
|
|
StreamScheduler scheduler(kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 15);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeWFQPacketsInFourStreamsByPriority) {
|
|
// Same as `WillDistributeWFQPacketsInTwoStreamsByPriority` but with more
|
|
// streams.
|
|
StreamScheduler scheduler(kMtu);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100), kPayloadSize);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200), kPayloadSize);
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(300), kPayloadSize);
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(400), kPayloadSize);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 50);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 5U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 10U);
|
|
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
|
|
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeFromTwoStreamsFairly) {
|
|
// A simple test with two streams of different priority, but sending packets
|
|
// of different size. Verifies that the ratio of total packet payload
|
|
// represent their priority.
|
|
// In this example,
|
|
// * stream1 has priority 100 and sends packets of size 8
|
|
// * stream2 has priority 400 and sends packets of size 4
|
|
// With round robin, stream1 would get twice as many payload bytes on the wire
|
|
// as stream2, but with WFQ and a 4x priority increase, stream2 should 4x as
|
|
// many payload bytes on the wire. That translates to stream2 getting 8x as
|
|
// many packets on the wire as they are half as large.
|
|
StreamScheduler scheduler(kMtu);
|
|
// Enable WFQ scheduler.
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
|
|
/*packet_size=*/8);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(400),
|
|
/*packet_size=*/4);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 90);
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 10U);
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 80U);
|
|
}
|
|
|
|
TEST(StreamSchedulerTest, WillDistributeFromFourStreamsFairly) {
|
|
// Same as `WillDistributeWeightedFairFromTwoStreamsFairly` but more
|
|
// complicated.
|
|
StreamScheduler scheduler(kMtu);
|
|
// Enable WFQ scheduler.
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
TestStream stream1(scheduler, StreamID(1), StreamPriority(100),
|
|
/*packet_size=*/10);
|
|
TestStream stream2(scheduler, StreamID(2), StreamPriority(200),
|
|
/*packet_size=*/10);
|
|
TestStream stream3(scheduler, StreamID(3), StreamPriority(200),
|
|
/*packet_size=*/20);
|
|
TestStream stream4(scheduler, StreamID(4), StreamPriority(400),
|
|
/*packet_size=*/30);
|
|
|
|
std::map<StreamID, size_t> packet_counts = GetPacketCounts(scheduler, 80);
|
|
// 15 packets * 10 bytes = 150 bytes at priority 100.
|
|
EXPECT_EQ(packet_counts[StreamID(1)], 15U);
|
|
// 30 packets * 10 bytes = 300 bytes at priority 200.
|
|
EXPECT_EQ(packet_counts[StreamID(2)], 30U);
|
|
// 15 packets * 20 bytes = 300 bytes at priority 200.
|
|
EXPECT_EQ(packet_counts[StreamID(3)], 15U);
|
|
// 20 packets * 30 bytes = 600 bytes at priority 400.
|
|
EXPECT_EQ(packet_counts[StreamID(4)], 20U);
|
|
}
|
|
|
|
// Sending large messages with small MTU will fragment the messages and produce
|
|
// a first fragment not larger than the MTU, and will then not first send from
|
|
// the stream with the smallest message, as their first fragment will be equally
|
|
// small for both streams. See `LargeMessageWithLargeMtu` for the same test, but
|
|
// with a larger MTU.
|
|
TEST(StreamSchedulerTest, SendLargeMessageWithSmallMtu) {
|
|
StreamScheduler scheduler(100 + SctpPacket::kHeaderSize +
|
|
IDataChunk::kHeaderSize);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(0), 100))
|
|
.WillOnce(CreateChunk(StreamID(1), MID(0), 100));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(200)) // When making active
|
|
.WillOnce(Return(100))
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(1), 100))
|
|
.WillOnce(CreateChunk(StreamID(2), MID(1), 50));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(150)) // When making active
|
|
.WillOnce(Return(50))
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
|
|
stream2->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
// Sending large messages with large MTU will not fragment messages and will
|
|
// send the message first from the stream that has the smallest message.
|
|
TEST(StreamSchedulerTest, SendLargeMessageWithLargeMtu) {
|
|
StreamScheduler scheduler(200 + SctpPacket::kHeaderSize +
|
|
IDataChunk::kHeaderSize);
|
|
scheduler.EnableMessageInterleaving(true);
|
|
|
|
StrictMock<MockStreamProducer> producer1;
|
|
EXPECT_CALL(producer1, Produce)
|
|
.WillOnce(CreateChunk(StreamID(1), MID(0), 200));
|
|
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(200)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream1 =
|
|
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(1));
|
|
stream1->MaybeMakeActive();
|
|
|
|
StrictMock<MockStreamProducer> producer2;
|
|
EXPECT_CALL(producer2, Produce)
|
|
.WillOnce(CreateChunk(StreamID(2), MID(1), 150));
|
|
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
|
.WillOnce(Return(150)) // When making active
|
|
.WillOnce(Return(0));
|
|
auto stream2 =
|
|
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(1));
|
|
stream2->MaybeMakeActive();
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(1)));
|
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
|
EXPECT_EQ(scheduler.Produce(TimeMs(0), kMtu), absl::nullopt);
|
|
}
|
|
|
|
} // namespace
|
|
} // namespace dcsctp
|