dcsctp: Determine chunks to be retransmitted fast

Before this CL, before sending out any chunk, all inflight data chunks
were inspected to find out if they were supposed to be retransmitted.

When the congestion window is large, this is a lot of data chunks to
inspect, which takes time.

By having a separate collection for chunks to be retransmitted, this
becomes a much faster operation. In most cases, constant in time.

Bug: webrtc:12799
Change-Id: I0d43ba7a88656eead26d5e0b9c4735622a8d080e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/219626
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34178}
This commit is contained in:
Victor Boivie
2021-05-22 22:56:07 +02:00
committed by WebRTC LUCI CQ
parent 82aa094a97
commit 7b4fd5ca59
2 changed files with 44 additions and 18 deletions

View File

@ -14,6 +14,7 @@
#include <functional>
#include <iterator>
#include <map>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
@ -83,17 +84,21 @@ RetransmissionQueue::RetransmissionQueue(
send_queue_(send_queue) {}
bool RetransmissionQueue::IsConsistent() const {
size_t actual_outstanding_bytes = absl::c_accumulate(
outstanding_data_, 0,
[&](size_t r, const std::pair<const UnwrappedTSN, TxData>& d) {
// Packets that have been ACKED or NACKED are not outstanding, as they
// are received. And packets that are marked for retransmission or
// abandoned are lost, and not outstanding.
return r + (d.second.is_outstanding()
? GetSerializedChunkSize(d.second.data())
: 0);
});
return actual_outstanding_bytes == outstanding_bytes_;
size_t actual_outstanding_bytes = 0;
std::set<UnwrappedTSN> actual_to_be_retransmitted;
for (const auto& elem : outstanding_data_) {
if (elem.second.is_outstanding()) {
actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data());
}
if (elem.second.should_be_retransmitted()) {
actual_to_be_retransmitted.insert(elem.first);
}
}
return actual_outstanding_bytes == outstanding_bytes_ &&
actual_to_be_retransmitted == to_be_retransmitted_;
}
// Returns how large a chunk will be, serialized, carrying the data
@ -110,6 +115,8 @@ void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
ack_info.acked_tsns.push_back(it->first.Wrap());
if (it->second.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(it->second.data());
} else if (it->second.should_be_retransmitted()) {
to_be_retransmitted_.erase(it->first);
}
}
@ -137,6 +144,9 @@ void RetransmissionQueue::AckGapBlocks(
if (iter->second.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
}
if (iter->second.should_be_retransmitted()) {
to_be_retransmitted_.erase(iter->first);
}
iter->second.Ack();
ack_info.highest_tsn_acked =
std::max(ack_info.highest_tsn_acked, iter->first);
@ -184,6 +194,7 @@ void RetransmissionQueue::NackBetweenAckBlocks(
if (iter->second.Nack()) {
ack_info.has_packet_loss = true;
to_be_retransmitted_.insert(iter->first);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << *iter->first.Wrap()
<< " marked for retransmission";
}
@ -497,6 +508,7 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
outstanding_bytes_ -= GetSerializedChunkSize(item.data());
}
if (item.Nack(/*retransmit_now=*/true)) {
to_be_retransmitted_.insert(tsn);
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Chunk " << *tsn.Wrap()
<< " will be retransmitted due to T3-RTX";
++count;
@ -521,19 +533,27 @@ void RetransmissionQueue::HandleT3RtxTimerExpiry() {
std::vector<std::pair<TSN, Data>>
RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
std::vector<std::pair<TSN, Data>> result;
for (auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
TxData& item = elem.second;
size_t serialized_size = GetSerializedChunkSize(item.data());
if (item.should_be_retransmitted() && serialized_size <= max_size) {
for (auto it = to_be_retransmitted_.begin();
it != to_be_retransmitted_.end();) {
UnwrappedTSN tsn = *it;
auto elem = outstanding_data_.find(tsn);
RTC_DCHECK(elem != outstanding_data_.end());
TxData& item = elem->second;
RTC_DCHECK(item.should_be_retransmitted());
RTC_DCHECK(!item.is_outstanding());
RTC_DCHECK(!item.is_abandoned());
RTC_DCHECK(!item.is_acked());
size_t serialized_size = GetSerializedChunkSize(item.data());
if (serialized_size <= max_size) {
item.Retransmit();
result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size;
outstanding_bytes_ += serialized_size;
it = to_be_retransmitted_.erase(it);
} else {
++it;
}
// No point in continuing if the packet is full.
if (max_size <= data_chunk_header_size_) {
@ -780,6 +800,9 @@ void RetransmissionQueue::ExpireAllFor(
other.data().message_id == item.data().message_id) {
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Marking chunk " << *tsn.Wrap()
<< " as abandoned";
if (other.should_be_retransmitted()) {
to_be_retransmitted_.erase(tsn);
}
other.Abandon();
}
}

View File

@ -13,6 +13,7 @@
#include <cstdint>
#include <functional>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
@ -360,6 +361,8 @@ class RetransmissionQueue {
// cumulative acked. Note that it also contains chunks that have been acked in
// gap ack blocks.
std::map<UnwrappedTSN, TxData> outstanding_data_;
// Data chunks that are to be retransmitted.
std::set<UnwrappedTSN> to_be_retransmitted_;
// The number of bytes that are in-flight (sent but not yet acked or nacked).
size_t outstanding_bytes_ = 0;
};