dcsctp: Use strong type for MaxRetransmits

It's put in the public folder since the intention is to expose it in
SendOptions.

Additionally, use TimeMs::InfiniteFuture() to represent sending a
message with no limited lifetime (i.e. to send it reliably).

One benefit for these two is avoiding using absl::optional more than
necessary, as it results in larger struct sizes for the outstanding
data chunks.

Bug: webrtc:12943
Change-Id: I87a340f0e0905342878fe9d2a74869bfcd6b0076
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/235984
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35323}
This commit is contained in:
Victor Boivie
2021-10-22 20:54:31 +02:00
committed by WebRTC LUCI CQ
parent b08290b3c5
commit 42a850d250
9 changed files with 149 additions and 94 deletions

View File

@ -105,6 +105,18 @@ constexpr inline DurationMs operator-(TimeMs lhs, TimeMs rhs) {
return DurationMs(*lhs - *rhs);
}
// The maximum number of times the socket should attempt to retransmit a
// message which fails the first time in unreliable mode.
class MaxRetransmits : public webrtc::StrongAlias<class TimeMsTag, uint16_t> {
public:
constexpr explicit MaxRetransmits(const UnderlyingType& v)
: webrtc::StrongAlias<class TimeMsTag, uint16_t>(v) {}
// There should be no limit - the message should be sent reliably.
static constexpr MaxRetransmits NoLimit() {
return MaxRetransmits(std::numeric_limits<uint16_t>::max());
}
};
} // namespace dcsctp
#endif // NET_DCSCTP_PUBLIC_TYPES_H_

View File

@ -15,13 +15,15 @@
#include <vector>
#include "net/dcsctp/common/math.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/public/types.h"
#include "rtc_base/logging.h"
namespace dcsctp {
// The number of times a packet must be NACKed before it's retransmitted.
// See https://tools.ietf.org/html/rfc4960#section-7.2.4
constexpr size_t kNumberOfNacksForRetransmission = 3;
constexpr uint8_t kNumberOfNacksForRetransmission = 3;
// Returns how large a chunk will be, serialized, carrying the data
size_t OutstandingData::GetSerializedChunkSize(const Data& data) const {
@ -40,8 +42,7 @@ OutstandingData::Item::NackAction OutstandingData::Item::Nack(
if ((retransmit_now || nack_count_ >= kNumberOfNacksForRetransmission) &&
!is_abandoned_) {
// Nacked enough times - it's considered lost.
if (!max_retransmissions_.has_value() ||
num_retransmissions_ < max_retransmissions_) {
if (num_retransmissions_ < *max_retransmissions_) {
should_be_retransmitted_ = true;
return NackAction::kRetransmit;
}
@ -65,7 +66,7 @@ void OutstandingData::Item::Abandon() {
}
bool OutstandingData::Item::has_expired(TimeMs now) const {
return expires_at_.has_value() && *expires_at_ <= now;
return expires_at_ <= now;
}
bool OutstandingData::IsConsistent() const {
@ -251,8 +252,9 @@ void OutstandingData::AbandonAllFor(const Item& item) {
Data::IsEnd(true), item.data().is_unordered);
Item& added_item =
outstanding_data_
.emplace(tsn, Item(std::move(message_end), absl::nullopt, TimeMs(0),
absl::nullopt))
.emplace(tsn,
Item(std::move(message_end), MaxRetransmits::NoLimit(),
TimeMs(0), TimeMs::InfiniteFuture()))
.first->second;
// The added chunk shouldn't be included in `outstanding_bytes`, so set it
// as acked.
@ -345,9 +347,9 @@ UnwrappedTSN OutstandingData::highest_outstanding_tsn() const {
absl::optional<UnwrappedTSN> OutstandingData::Insert(
const Data& data,
absl::optional<size_t> max_retransmissions,
MaxRetransmits max_retransmissions,
TimeMs time_sent,
absl::optional<TimeMs> expires_at) {
TimeMs expires_at) {
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();

View File

@ -109,11 +109,10 @@ class OutstandingData {
// Schedules `data` to be sent, with the provided partial reliability
// parameters. Returns the TSN if the item was actually added and scheduled to
// be sent, and absl::nullopt if it shouldn't be sent.
absl::optional<UnwrappedTSN> Insert(
const Data& data,
absl::optional<size_t> max_retransmissions,
TimeMs time_sent,
absl::optional<TimeMs> expires_at);
absl::optional<UnwrappedTSN> Insert(const Data& data,
MaxRetransmits max_retransmissions,
TimeMs time_sent,
TimeMs expires_at);
// Nacks all outstanding data.
void NackAll();
@ -149,9 +148,9 @@ class OutstandingData {
};
explicit Item(Data data,
absl::optional<size_t> max_retransmissions,
MaxRetransmits max_retransmissions,
TimeMs time_sent,
absl::optional<TimeMs> expires_at)
TimeMs expires_at)
: max_retransmissions_(max_retransmissions),
time_sent_(time_sent),
expires_at_(expires_at),
@ -207,18 +206,18 @@ class OutstandingData {
// The number of times the DATA chunk has been nacked (by having received a
// SACK which doesn't include it). Will be cleared on retransmissions.
size_t nack_count_ = 0;
uint8_t nack_count_ = 0;
// The number of times the DATA chunk has been retransmitted.
size_t num_retransmissions_ = 0;
uint16_t num_retransmissions_ = 0;
// If the message was sent with a maximum number of retransmissions, this is
// set to that number. The value zero (0) means that it will never be
// retransmitted.
const absl::optional<size_t> max_retransmissions_;
const MaxRetransmits max_retransmissions_;
// When the packet was sent, and placed in this queue.
const TimeMs time_sent_;
// If the message was sent with an expiration time, this is set. At this
// exact millisecond, the item is considered expired.
const absl::optional<TimeMs> expires_at_;
// At this exact millisecond, the item is considered expired. If the message
// is not to be expired, this is set to the infinite future.
const TimeMs expires_at_;
// The actual data to send/retransmit.
Data data_;
};

View File

@ -65,7 +65,8 @@ TEST_F(OutstandingDataTest, HasInitialState) {
TEST_F(OutstandingDataTest, InsertChunk) {
ASSERT_HAS_VALUE_AND_ASSIGN(
UnwrappedTSN tsn,
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt));
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture()));
EXPECT_EQ(tsn.Wrap(), TSN(10));
@ -81,7 +82,8 @@ TEST_F(OutstandingDataTest, InsertChunk) {
}
TEST_F(OutstandingDataTest, AcksSingleChunk) {
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
OutstandingData::AckInfo ack =
buf_.HandleSack(unwrapper_.Unwrap(TSN(10)), {}, false);
@ -100,7 +102,8 @@ TEST_F(OutstandingDataTest, AcksSingleChunk) {
}
TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), {}, false);
EXPECT_EQ(buf_.outstanding_bytes(), DataChunk::kHeaderSize + RoundUpTo4(1));
@ -115,8 +118,10 @@ TEST_F(OutstandingDataTest, AcksPreviousChunkDoesntUpdate) {
}
TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "B"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
std::vector<SackChunk::GapAckBlock> gab = {SackChunk::GapAckBlock(2, 2)};
OutstandingData::AckInfo ack =
@ -138,8 +143,10 @@ TEST_F(OutstandingDataTest, AcksAndNacksWithGapAckBlocks) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) {
buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "B"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -161,10 +168,14 @@ TEST_F(OutstandingDataTest, NacksThreeTimesWithSameTsnDoesntRetransmit) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) {
buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "B"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -197,11 +208,15 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
static constexpr uint16_t kMaxRetransmissions = 0;
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow, absl::nullopt);
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -233,11 +248,15 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
}
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
static constexpr uint16_t kMaxRetransmissions = 0;
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
@ -271,16 +290,16 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoningWithPlaceholder) {
TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) {
static constexpr TimeMs kExpiresAt = kNow + DurationMs(1);
EXPECT_TRUE(
buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, kExpiresAt)
.has_value());
EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt,
EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, "B"), MaxRetransmits::NoLimit(),
kNow, kExpiresAt)
.has_value());
EXPECT_TRUE(buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(),
kNow + DurationMs(0), kExpiresAt)
.has_value());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
EXPECT_FALSE(buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt,
EXPECT_FALSE(buf_.Insert(gen_.Ordered({1}, "E"), MaxRetransmits::NoLimit(),
kNow + DurationMs(1), kExpiresAt)
.has_value());
@ -296,10 +315,13 @@ TEST_F(OutstandingDataTest, ExpiresChunkBeforeItIsInserted) {
}
TEST_F(OutstandingDataTest, CanGenerateForwardTsn) {
static constexpr uint16_t kMaxRetransmissions = 0;
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow, absl::nullopt);
static constexpr MaxRetransmits kMaxRetransmissions(0);
buf_.Insert(gen_.Ordered({1}, "B"), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), kMaxRetransmissions, kNow,
TimeMs::InfiniteFuture());
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
@ -318,14 +340,22 @@ TEST_F(OutstandingDataTest, CanGenerateForwardTsn) {
}
TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) {
buf_.Insert(gen_.Ordered({1}, "B"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, ""), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "E"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "B"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, ""), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "E"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
@ -352,11 +382,12 @@ TEST_F(OutstandingDataTest, AckWithGapBlocksFromRFC4960Section334) {
}
TEST_F(OutstandingDataTest, MeasureRTT) {
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow, absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow + DurationMs(1),
absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "BE"), absl::nullopt, kNow + DurationMs(2),
absl::nullopt);
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(), kNow,
TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(),
kNow + DurationMs(1), TimeMs::InfiniteFuture());
buf_.Insert(gen_.Ordered({1}, "BE"), MaxRetransmits::NoLimit(),
kNow + DurationMs(2), TimeMs::InfiniteFuture());
static constexpr DurationMs kDuration(123);
ASSERT_HAS_VALUE_AND_ASSIGN(

View File

@ -440,8 +440,11 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
absl::optional<UnwrappedTSN> tsn = outstanding_data_.Insert(
chunk_opt->data,
partial_reliability_ ? chunk_opt->max_retransmissions : absl::nullopt,
now, partial_reliability_ ? chunk_opt->expires_at : absl::nullopt);
partial_reliability_ ? chunk_opt->max_retransmissions
: MaxRetransmits::NoLimit(),
now,
partial_reliability_ ? chunk_opt->expires_at
: TimeMs::InfiniteFuture());
if (tsn.has_value()) {
to_be_sent.emplace_back(tsn->Wrap(), std::move(chunk_opt->data));

View File

@ -354,7 +354,7 @@ TEST_F(RetransmissionQueueTest, LimitedRetransmissionOnlyWithRfc3758Support) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -384,7 +384,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsAsUdp) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -426,7 +426,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsToThreeSends) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 3;
dts.max_retransmissions = MaxRetransmits(3);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -517,17 +517,17 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -572,17 +572,17 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -627,28 +627,28 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(1);
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(2);
SendQueue::DataToSend dts(gen_.Unordered({1, 2, 3, 4}, "B", opts));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(3);
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "B", opts));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
DataGeneratorOptions opts;
opts.stream_id = StreamID(4);
SendQueue::DataToSend dts(gen_.Ordered({13, 14, 15, 16}, "B", opts));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -742,7 +742,7 @@ TEST_F(RetransmissionQueueTest, MeasureRTT) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -907,17 +907,17 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, ""));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
@ -1006,7 +1006,7 @@ TEST_F(RetransmissionQueueTest, LimitsRetransmissionsOnlyWhenNackedThreeTimes) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 0;
dts.max_retransmissions = MaxRetransmits(0);
return dts;
})
.WillOnce(CreateChunk())
@ -1077,7 +1077,7 @@ TEST_F(RetransmissionQueueTest, AbandonsRtxLimit2WhenNackedNineTimes) {
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "BE"));
dts.max_retransmissions = 2;
dts.max_retransmissions = MaxRetransmits(2);
return dts;
})
.WillOnce(CreateChunk())

View File

@ -11,6 +11,7 @@
#include <cstdint>
#include <deque>
#include <limits>
#include <map>
#include <utility>
#include <vector>
@ -49,7 +50,7 @@ bool RRSendQueue::OutgoingStream::HasDataToSend(TimeMs now) {
}
// Message has expired. Remove it and inspect the next one.
if (item.expires_at.has_value() && *item.expires_at <= now) {
if (item.expires_at <= now) {
buffered_amount_.Decrease(item.remaining_size);
total_buffered_amount_.Decrease(item.remaining_size);
items_.pop_front();
@ -125,7 +126,7 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
}
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
absl::optional<TimeMs> expires_at,
TimeMs expires_at,
const SendOptions& send_options) {
buffered_amount_.Increase(message.payload().size());
total_buffered_amount_.Increase(message.payload().size());
@ -186,7 +187,14 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::OutgoingStream::Produce(
item->message_id.value(), fsn, ppid,
std::move(payload), is_beginning, is_end,
item->send_options.unordered));
chunk.max_retransmissions = item->send_options.max_retransmissions;
if (item->send_options.max_retransmissions.has_value() &&
*item->send_options.max_retransmissions >=
std::numeric_limits<MaxRetransmits::UnderlyingType>::min() &&
*item->send_options.max_retransmissions <=
std::numeric_limits<MaxRetransmits::UnderlyingType>::max()) {
chunk.max_retransmissions =
MaxRetransmits(*item->send_options.max_retransmissions);
}
chunk.expires_at = item->expires_at;
if (is_end) {
@ -278,7 +286,7 @@ void RRSendQueue::Add(TimeMs now,
RTC_DCHECK(!message.payload().empty());
// Any limited lifetime should start counting from now - when the message
// has been added to the queue.
absl::optional<TimeMs> expires_at = absl::nullopt;
TimeMs expires_at = TimeMs::InfiniteFuture();
if (send_options.lifetime.has_value()) {
// `expires_at` is the time when it expires. Which is slightly larger than
// the message's lifetime, as the message is alive during its entire

View File

@ -124,7 +124,7 @@ class RRSendQueue : public SendQueue {
// Enqueues a message to this stream.
void Add(DcSctpMessage message,
absl::optional<TimeMs> expires_at,
TimeMs expires_at,
const SendOptions& send_options);
// Possibly produces a data chunk to send.
@ -161,7 +161,7 @@ class RRSendQueue : public SendQueue {
// An enqueued message and metadata.
struct Item {
explicit Item(DcSctpMessage msg,
absl::optional<TimeMs> expires_at,
TimeMs expires_at,
const SendOptions& send_options)
: message(std::move(msg)),
expires_at(expires_at),
@ -169,7 +169,7 @@ class RRSendQueue : public SendQueue {
remaining_offset(0),
remaining_size(message.payload().size()) {}
DcSctpMessage message;
absl::optional<TimeMs> expires_at;
TimeMs expires_at;
SendOptions send_options;
// The remaining payload (offset and size) to be sent, when it has been
// fragmented.

View File

@ -32,8 +32,8 @@ class SendQueue {
Data data;
// Partial reliability - RFC3758
absl::optional<int> max_retransmissions;
absl::optional<TimeMs> expires_at;
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit();
TimeMs expires_at = TimeMs::InfiniteFuture();
};
virtual ~SendQueue() = default;