dcsctp: Add proper fast retransmission support

This CL makes OutstandingData keep track of chunks that are eligible for
fast retransmission. When the socket goes into fast recovery, the
reported missing chunks can be retransmitted quickly (ignoring the
congestion window) according to
https://datatracker.ietf.org/doc/html/rfc4960#section-7.2.4.

The CL also adds the new API to OutstandingData to retrieve only the
chunks that are eligible for fast retransmission, and moves the
remaining chunks to the ordinary list of chunks to be retransmitted
later.

This solves an issue where the retransmission timer wouldn't start if
there wouldn't be any chunks to fast-retransmit.

It doesn't, however, make sure that chunks that should be fast
retransmitted can send even when the congestion window is full. That
will be solved in the follow-up CL.

Bug: webrtc:13969
Change-Id: If4012d1cb284ef4a2d815683ed60cbbbff5b3c3b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/259865
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36721}
This commit is contained in:
Victor Boivie
2022-04-22 11:45:19 +02:00
committed by WebRTC LUCI CQ
parent c706220cbd
commit a5fecb3917
5 changed files with 106 additions and 38 deletions

View File

@ -72,7 +72,13 @@ bool OutstandingData::IsConsistent() const {
size_t actual_outstanding_bytes = 0;
size_t actual_outstanding_items = 0;
std::set<UnwrappedTSN> actual_to_be_retransmitted;
std::set<UnwrappedTSN> combined_to_be_retransmitted;
combined_to_be_retransmitted.insert(to_be_retransmitted_.begin(),
to_be_retransmitted_.end());
combined_to_be_retransmitted.insert(to_be_fast_retransmitted_.begin(),
to_be_fast_retransmitted_.end());
std::set<UnwrappedTSN> actual_combined_to_be_retransmitted;
for (const auto& [tsn, item] : outstanding_data_) {
if (item.is_outstanding()) {
actual_outstanding_bytes += GetSerializedChunkSize(item.data());
@ -80,7 +86,7 @@ bool OutstandingData::IsConsistent() const {
}
if (item.should_be_retransmitted()) {
actual_to_be_retransmitted.insert(tsn);
actual_combined_to_be_retransmitted.insert(tsn);
}
}
@ -91,7 +97,7 @@ bool OutstandingData::IsConsistent() const {
return actual_outstanding_bytes == outstanding_bytes_ &&
actual_outstanding_items == outstanding_items_ &&
actual_to_be_retransmitted == to_be_retransmitted_;
actual_combined_to_be_retransmitted == combined_to_be_retransmitted;
}
void OutstandingData::AckChunk(AckInfo& ack_info,
@ -105,6 +111,7 @@ void OutstandingData::AckChunk(AckInfo& ack_info,
}
if (iter->second.should_be_retransmitted()) {
to_be_retransmitted_.erase(iter->first);
to_be_fast_retransmitted_.erase(iter->first);
}
iter->second.Ack();
ack_info.highest_tsn_acked =
@ -115,7 +122,7 @@ void OutstandingData::AckChunk(AckInfo& ack_info,
OutstandingData::AckInfo OutstandingData::HandleSack(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_retransmit) {
bool is_in_fast_recovery) {
OutstandingData::AckInfo ack_info(cumulative_tsn_ack);
// Erase all items up to cumulative_tsn_ack.
RemoveAcked(cumulative_tsn_ack, ack_info);
@ -124,8 +131,8 @@ OutstandingData::AckInfo OutstandingData::HandleSack(
AckGapBlocks(cumulative_tsn_ack, gap_ack_blocks, ack_info);
// NACK and possibly mark for retransmit chunks that weren't acked.
NackBetweenAckBlocks(cumulative_tsn_ack, gap_ack_blocks,
is_in_fast_retransmit, ack_info);
NackBetweenAckBlocks(cumulative_tsn_ack, gap_ack_blocks, is_in_fast_recovery,
ack_info);
RTC_DCHECK(IsConsistent());
return ack_info;
@ -196,8 +203,9 @@ void OutstandingData::NackBetweenAckBlocks(
for (auto iter = outstanding_data_.upper_bound(prev_block_last_acked);
iter != outstanding_data_.lower_bound(cur_block_first_acked); ++iter) {
if (iter->first <= max_tsn_to_nack) {
ack_info.has_packet_loss =
NackItem(iter->first, iter->second, /*retransmit_now=*/false);
ack_info.has_packet_loss |=
NackItem(iter->first, iter->second, /*retransmit_now=*/false,
/*do_fast_retransmit=*/!is_in_fast_recovery);
}
}
prev_block_last_acked = UnwrappedTSN::AddTo(cumulative_tsn_ack, block.end);
@ -211,7 +219,8 @@ void OutstandingData::NackBetweenAckBlocks(
bool OutstandingData::NackItem(UnwrappedTSN tsn,
Item& item,
bool retransmit_now) {
bool retransmit_now,
bool do_fast_retransmit) {
if (item.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(item.data());
--outstanding_items_;
@ -221,7 +230,11 @@ bool OutstandingData::NackItem(UnwrappedTSN tsn,
case Item::NackAction::kNothing:
return false;
case Item::NackAction::kRetransmit:
to_be_retransmitted_.insert(tsn);
if (do_fast_retransmit) {
to_be_fast_retransmitted_.insert(tsn);
} else {
to_be_retransmitted_.insert(tsn);
}
RTC_DLOG(LS_VERBOSE) << *tsn.Wrap() << " marked for retransmission";
break;
case Item::NackAction::kAbandon:
@ -270,6 +283,8 @@ void OutstandingData::AbandonAllFor(const Item& item) {
RTC_DLOG(LS_VERBOSE) << "Marking chunk " << *tsn.Wrap()
<< " as abandoned";
if (other.should_be_retransmitted()) {
RTC_DCHECK(to_be_fast_retransmitted_.find(tsn) ==
to_be_fast_retransmitted_.end());
to_be_retransmitted_.erase(tsn);
}
other.Abandon();
@ -277,12 +292,12 @@ void OutstandingData::AbandonAllFor(const Item& item) {
}
}
std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
std::vector<std::pair<TSN, Data>> OutstandingData::ExtractChunksThatCanFit(
std::set<UnwrappedTSN>& chunks,
size_t max_size) {
std::vector<std::pair<TSN, Data>> result;
for (auto it = to_be_retransmitted_.begin();
it != to_be_retransmitted_.end();) {
for (auto it = chunks.begin(); it != chunks.end();) {
UnwrappedTSN tsn = *it;
auto elem = outstanding_data_.find(tsn);
RTC_DCHECK(elem != outstanding_data_.end());
@ -299,7 +314,7 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
max_size -= serialized_size;
outstanding_bytes_ += serialized_size;
++outstanding_items_;
it = to_be_retransmitted_.erase(it);
it = chunks.erase(it);
} else {
++it;
}
@ -308,11 +323,37 @@ std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
break;
}
}
return result;
}
std::vector<std::pair<TSN, Data>>
OutstandingData::GetChunksToBeFastRetransmitted(size_t max_size) {
std::vector<std::pair<TSN, Data>> result =
ExtractChunksThatCanFit(to_be_fast_retransmitted_, max_size);
// https://datatracker.ietf.org/doc/html/rfc4960#section-7.2.4
// "Those TSNs marked for retransmission due to the Fast-Retransmit algorithm
// that did not fit in the sent datagram carrying K other TSNs are also marked
// as ineligible for a subsequent Fast Retransmit. However, as they are
// marked for retransmission they will be retransmitted later on as soon as
// cwnd allows."
if (!to_be_fast_retransmitted_.empty()) {
to_be_retransmitted_.insert(to_be_fast_retransmitted_.begin(),
to_be_fast_retransmitted_.end());
to_be_fast_retransmitted_.clear();
}
RTC_DCHECK(IsConsistent());
return result;
}
std::vector<std::pair<TSN, Data>> OutstandingData::GetChunksToBeRetransmitted(
size_t max_size) {
// Chunks scheduled for fast retransmission must be sent first.
RTC_DCHECK(to_be_fast_retransmitted_.empty());
return ExtractChunksThatCanFit(to_be_retransmitted_, max_size);
}
void OutstandingData::ExpireOutstandingChunks(TimeMs now) {
for (const auto& [tsn, item] : outstanding_data_) {
// Chunks that are nacked can be expired. Care should be taken not to expire
@ -373,7 +414,8 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
void OutstandingData::NackAll() {
for (auto& [tsn, item] : outstanding_data_) {
if (!item.is_acked()) {
NackItem(tsn, item, /*retransmit_now=*/true);
NackItem(tsn, item, /*retransmit_now=*/true,
/*do_fast_retransmit=*/false);
}
}
RTC_DCHECK(IsConsistent());

View File

@ -77,7 +77,14 @@ class OutstandingData {
AckInfo HandleSack(
UnwrappedTSN cumulative_tsn_ack,
rtc::ArrayView<const SackChunk::GapAckBlock> gap_ack_blocks,
bool is_in_fast_retransmit);
bool is_in_fast_recovery);
// Returns as many of the chunks that are eligible for fast retransmissions
// and that would fit in a single packet of `max_size`. The eligible chunks
// that didn't fit will be marked for (normal) retransmission and will not be
// returned if this method is called again.
std::vector<std::pair<TSN, Data>> GetChunksToBeFastRetransmitted(
size_t max_size);
// Given `max_size` of space left in a packet, which chunks can be added to
// it?
@ -94,8 +101,12 @@ class OutstandingData {
bool empty() const { return outstanding_data_.empty(); }
bool has_data_to_be_fast_retransmitted() const {
return !to_be_fast_retransmitted_.empty();
}
bool has_data_to_be_retransmitted() const {
return !to_be_retransmitted_.empty();
return !to_be_retransmitted_.empty() || !to_be_fast_retransmitted_.empty();
}
UnwrappedTSN last_cumulative_tsn_ack() const {
@ -167,7 +178,7 @@ class OutstandingData {
// is set, it might be marked for retransmission. If the item has reached
// its max retransmission value, it will instead be abandoned. The action
// performed is indicated as return value.
NackAction Nack(bool retransmit_now = false);
NackAction Nack(bool retransmit_now);
// Prepares the item to be retransmitted. Sets it as outstanding and
// clears all nack counters.
@ -258,21 +269,32 @@ class OutstandingData {
bool is_in_fast_recovery,
OutstandingData::AckInfo& ack_info);
// Acks the chunk referenced by `iter` and updates state in `ack_info` and the
// object's state.
// Process the acknowledgement of the chunk referenced by `iter` and updates
// state in `ack_info` and the object's state.
void AckChunk(AckInfo& ack_info, std::map<UnwrappedTSN, Item>::iterator iter);
// Helper method to nack an item and perform the correct operations given the
// action indicated when nacking an item (e.g. retransmitting or abandoning).
// The return value indicate if an action was performed, meaning that packet
// loss was detected and acted upon.
bool NackItem(UnwrappedTSN tsn, Item& item, bool retransmit_now);
// Helper method to process an incoming nack of an item and perform the
// correct operations given the action indicated when nacking an item (e.g.
// retransmitting or abandoning). The return value indicate if an action was
// performed, meaning that packet loss was detected and acted upon. If
// `do_fast_retransmit` is set and if the item has been nacked sufficiently
// many times so that it should be retransmitted, this will schedule it to be
// "fast retransmitted". This is only done just before going into fast
// recovery.
bool NackItem(UnwrappedTSN tsn,
Item& item,
bool retransmit_now,
bool do_fast_retransmit);
// Given that a message fragment, `item` has been abandoned, abandon all other
// fragments that share the same message - both never-before-sent fragments
// that are still in the SendQueue and outstanding chunks.
void AbandonAllFor(const OutstandingData::Item& item);
std::vector<std::pair<TSN, Data>> ExtractChunksThatCanFit(
std::set<UnwrappedTSN>& chunks,
size_t max_size);
bool IsConsistent() const;
// The size of the data chunk (DATA/I-DATA) header that is used.
@ -290,6 +312,8 @@ class OutstandingData {
// The number of DATA chunks that are in-flight (sent but not yet acked or
// nacked).
size_t outstanding_items_ = 0;
// Data chunks that are eligible for fast retransmission.
std::set<UnwrappedTSN> to_be_fast_retransmitted_;
// Data chunks that are to be retransmitted.
std::set<UnwrappedTSN> to_be_retransmitted_;
};

View File

@ -28,6 +28,7 @@ using ::testing::MockFunction;
using State = ::dcsctp::OutstandingData::State;
using ::testing::_;
using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::Pair;
using ::testing::Return;
using ::testing::StrictMock;
@ -203,8 +204,9 @@ TEST_F(OutstandingDataTest, NacksThreeTimesResultsInRetransmission) {
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked)));
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000),
EXPECT_THAT(buf_.GetChunksToBeFastRetransmitted(1000),
ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), IsEmpty());
}
TEST_F(OutstandingDataTest, NacksThreeTimesResultsInAbandoning) {
@ -446,8 +448,9 @@ TEST_F(OutstandingDataTest, MustRetransmitBeforeGettingNackedAgain) {
EXPECT_TRUE(buf_.has_data_to_be_retransmitted());
// Now it's retransmitted.
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000),
EXPECT_THAT(buf_.GetChunksToBeFastRetransmitted(1000),
ElementsAre(Pair(TSN(10), _)));
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), IsEmpty());
// And obviously lost, as it will get NACKed and abandoned.
std::vector<SackChunk::GapAckBlock> gab7 = {SackChunk::GapAckBlock(2, 8)};

View File

@ -278,8 +278,12 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
UpdateRTT(now, cumulative_tsn_ack);
}
// Exit fast recovery before continuing processing, in case it needs to go
// into fast recovery again due to new reported packet loss.
MaybeExitFastRecovery(cumulative_tsn_ack);
OutstandingData::AckInfo ack_info = outstanding_data_.HandleSack(
cumulative_tsn_ack, sack.gap_ack_blocks(), is_in_fast_retransmit_);
cumulative_tsn_ack, sack.gap_ack_blocks(), is_in_fast_recovery());
// Update of outstanding_data_ is now done. Congestion control remains.
UpdateReceiverWindow(sack.a_rwnd());
@ -292,8 +296,6 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
<< old_outstanding_bytes << "), rwnd=" << rwnd_ << " ("
<< old_rwnd << ")";
MaybeExitFastRecovery(cumulative_tsn_ack);
if (cumulative_tsn_ack > old_last_cumulative_tsn_ack) {
// https://tools.ietf.org/html/rfc4960#section-6.3.2
// "Whenever a SACK is received that acknowledges the DATA chunk
@ -308,7 +310,6 @@ bool RetransmissionQueue::HandleSack(TimeMs now, const SackChunk& sack) {
}
if (ack_info.has_packet_loss) {
is_in_fast_retransmit_ = true;
HandlePacketLoss(ack_info.highest_tsn_acked);
}
@ -395,16 +396,15 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
std::vector<std::pair<TSN, Data>> to_be_sent;
size_t old_outstanding_bytes = outstanding_bytes();
size_t old_rwnd = rwnd_;
if (is_in_fast_retransmit_) {
if (outstanding_data_.has_data_to_be_fast_retransmitted()) {
// https://tools.ietf.org/html/rfc4960#section-7.2.4
// "Determine how many of the earliest (i.e., lowest TSN) DATA chunks
// marked for retransmission will fit into a single packet ... Retransmit
// those K DATA chunks in a single packet. When a Fast Retransmit is being
// performed, the sender SHOULD ignore the value of cwnd and SHOULD NOT
// delay retransmission for this single packet."
is_in_fast_retransmit_ = false;
to_be_sent =
outstanding_data_.GetChunksToBeRetransmitted(bytes_remaining_in_packet);
to_be_sent = outstanding_data_.GetChunksToBeFastRetransmitted(
bytes_remaining_in_packet);
size_t to_be_sent_bytes = absl::c_accumulate(
to_be_sent, 0, [&](size_t r, const std::pair<TSN, Data>& d) {
return r + GetSerializedChunkSize(d.second);
@ -412,7 +412,8 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "fast-retransmit: sending "
<< to_be_sent.size() << " chunks, " << to_be_sent_bytes
<< " bytes";
} else {
}
if (to_be_sent.empty()) {
// Normal sending. Calculate the bandwidth budget (how many bytes that is
// allowed to be sent), and fill that up first with chunks that are
// scheduled to be retransmitted. If there is still budget, send new chunks

View File

@ -229,8 +229,6 @@ class RetransmissionQueue {
// If set, fast recovery is enabled until this TSN has been cumulative
// acked.
absl::optional<UnwrappedTSN> fast_recovery_exit_tsn_ = absl::nullopt;
// Indicates if the congestion algorithm is in fast retransmit.
bool is_in_fast_retransmit_ = false;
// The send queue.
SendQueue& send_queue_;