dcsctp: Track the number of inflight DATA items

This corresponds to one part of sstat_unackdata in RFC6458. The
remaining part is the data in the send queue, which isn't packetized
yet, so it must be estimated. But the DATA items in the retransmission
queue is already determined, so it can be easily tracked and retrieved.

Bug: webrtc:13052
Change-Id: I16c3b5b61eb6b3022d7104e6457d943d5df3d6b9
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228240
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34706}
This commit is contained in:
Victor Boivie
2021-08-09 12:26:11 +02:00
committed by WebRTC LUCI CQ
parent 2ddc39e2b9
commit 82ea522b27
3 changed files with 23 additions and 0 deletions

View File

@ -85,11 +85,13 @@ RetransmissionQueue::RetransmissionQueue(
bool RetransmissionQueue::IsConsistent() const { bool RetransmissionQueue::IsConsistent() const {
size_t actual_outstanding_bytes = 0; size_t actual_outstanding_bytes = 0;
size_t actual_outstanding_items = 0;
std::set<UnwrappedTSN> actual_to_be_retransmitted; std::set<UnwrappedTSN> actual_to_be_retransmitted;
for (const auto& elem : outstanding_data_) { for (const auto& elem : outstanding_data_) {
if (elem.second.is_outstanding()) { if (elem.second.is_outstanding()) {
actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data()); actual_outstanding_bytes += GetSerializedChunkSize(elem.second.data());
++actual_outstanding_items;
} }
if (elem.second.should_be_retransmitted()) { if (elem.second.should_be_retransmitted()) {
@ -98,6 +100,7 @@ bool RetransmissionQueue::IsConsistent() const {
} }
return actual_outstanding_bytes == outstanding_bytes_ && return actual_outstanding_bytes == outstanding_bytes_ &&
actual_outstanding_items == outstanding_items_ &&
actual_to_be_retransmitted == to_be_retransmitted_; actual_to_be_retransmitted == to_be_retransmitted_;
} }
@ -115,6 +118,7 @@ void RetransmissionQueue::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
ack_info.acked_tsns.push_back(it->first.Wrap()); ack_info.acked_tsns.push_back(it->first.Wrap());
if (it->second.is_outstanding()) { if (it->second.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(it->second.data()); outstanding_bytes_ -= GetSerializedChunkSize(it->second.data());
--outstanding_items_;
} else if (it->second.should_be_retransmitted()) { } else if (it->second.should_be_retransmitted()) {
to_be_retransmitted_.erase(it->first); to_be_retransmitted_.erase(it->first);
} }
@ -143,6 +147,7 @@ void RetransmissionQueue::AckGapBlocks(
iter->second.data().size(); iter->second.data().size();
if (iter->second.is_outstanding()) { if (iter->second.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data()); outstanding_bytes_ -= GetSerializedChunkSize(iter->second.data());
--outstanding_items_;
} }
if (iter->second.should_be_retransmitted()) { if (iter->second.should_be_retransmitted()) {
to_be_retransmitted_.erase(iter->first); to_be_retransmitted_.erase(iter->first);
@ -517,6 +522,7 @@ bool RetransmissionQueue::NackItem(UnwrappedTSN tsn,
bool retransmit_now) { bool retransmit_now) {
if (item.is_outstanding()) { if (item.is_outstanding()) {
outstanding_bytes_ -= GetSerializedChunkSize(item.data()); outstanding_bytes_ -= GetSerializedChunkSize(item.data());
--outstanding_items_;
} }
switch (item.Nack(retransmit_now)) { switch (item.Nack(retransmit_now)) {
@ -555,6 +561,7 @@ RetransmissionQueue::GetChunksToBeRetransmitted(size_t max_size) {
result.emplace_back(tsn.Wrap(), item.data().Clone()); result.emplace_back(tsn.Wrap(), item.data().Clone());
max_size -= serialized_size; max_size -= serialized_size;
outstanding_bytes_ += serialized_size; outstanding_bytes_ += serialized_size;
++outstanding_items_;
it = to_be_retransmitted_.erase(it); it = to_be_retransmitted_.erase(it);
} else { } else {
++it; ++it;
@ -624,6 +631,7 @@ std::vector<std::pair<TSN, Data>> RetransmissionQueue::GetChunksToSend(
size_t chunk_size = GetSerializedChunkSize(chunk_opt->data); size_t chunk_size = GetSerializedChunkSize(chunk_opt->data);
max_bytes -= chunk_size; max_bytes -= chunk_size;
outstanding_bytes_ += chunk_size; outstanding_bytes_ += chunk_size;
++outstanding_items_;
rwnd_ -= chunk_size; rwnd_ -= chunk_size;
auto item_it = auto item_it =
outstanding_data_ outstanding_data_

View File

@ -115,6 +115,9 @@ class RetransmissionQueue {
// Returns the number of bytes of packets that are in-flight. // Returns the number of bytes of packets that are in-flight.
size_t outstanding_bytes() const { return outstanding_bytes_; } size_t outstanding_bytes() const { return outstanding_bytes_; }
// Returns the number of DATA chunks that are in-flight.
size_t outstanding_items() const { return outstanding_items_; }
// Given the current time `now`, it will evaluate if there are chunks that // Given the current time `now`, it will evaluate if there are chunks that
// have expired and that need to be discarded. It returns true if a // have expired and that need to be discarded. It returns true if a
// FORWARD-TSN should be sent. // FORWARD-TSN should be sent.
@ -381,6 +384,9 @@ class RetransmissionQueue {
std::set<UnwrappedTSN> to_be_retransmitted_; std::set<UnwrappedTSN> to_be_retransmitted_;
// The number of bytes that are in-flight (sent but not yet acked or nacked). // The number of bytes that are in-flight (sent but not yet acked or nacked).
size_t outstanding_bytes_ = 0; size_t outstanding_bytes_ = 0;
// The number of DATA chunks that are in-flight (sent but not yet acked or
// nacked).
size_t outstanding_items_ = 0;
}; };
} // namespace dcsctp } // namespace dcsctp

View File

@ -455,6 +455,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
queue.set_cwnd(kCwnd); queue.set_cwnd(kCwnd);
EXPECT_EQ(queue.cwnd(), kCwnd); EXPECT_EQ(queue.cwnd(), kCwnd);
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
std::vector<uint8_t> payload(1000); std::vector<uint8_t> payload(1000);
EXPECT_CALL(producer_, Produce) EXPECT_CALL(producer_, Produce)
@ -470,6 +471,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight))); Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
EXPECT_EQ(queue.outstanding_items(), 1u);
// Will force chunks to be retransmitted // Will force chunks to be retransmitted
queue.HandleT3RtxTimerExpiry(); queue.HandleT3RtxTimerExpiry();
@ -478,6 +480,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kToBeRetransmitted))); Pair(TSN(10), State::kToBeRetransmitted)));
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
std::vector<std::pair<TSN, Data>> chunks_to_rtx = std::vector<std::pair<TSN, Data>> chunks_to_rtx =
queue.GetChunksToSend(now_, 1500); queue.GetChunksToSend(now_, 1500);
@ -486,6 +489,7 @@ TEST_F(RetransmissionQueueTest, RetransmitsWhenSendBufferIsFullT3Expiry) {
ElementsAre(Pair(TSN(9), State::kAcked), // ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight))); Pair(TSN(10), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize); EXPECT_EQ(queue.outstanding_bytes(), payload.size() + DataChunk::kHeaderSize);
EXPECT_EQ(queue.outstanding_items(), 1u);
} }
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
@ -909,6 +913,7 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(11), State::kInFlight), // Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight))); Pair(TSN(12), State::kInFlight)));
EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u); EXPECT_EQ(queue.outstanding_bytes(), (16 + 4) * 3u);
EXPECT_EQ(queue.outstanding_items(), 3u);
// Mark the message as lost. // Mark the message as lost.
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
@ -923,16 +928,20 @@ TEST_F(RetransmissionQueueTest, AccountsNackedAbandonedChunksAsNotOutstanding) {
Pair(TSN(11), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned))); Pair(TSN(12), State::kAbandoned)));
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
// Now ACK those, one at a time. // Now ACK those, one at a time.
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(11), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_EQ(queue.outstanding_bytes(), 0u); EXPECT_EQ(queue.outstanding_bytes(), 0u);
EXPECT_EQ(queue.outstanding_items(), 0u);
} }
TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) { TEST_F(RetransmissionQueueTest, ExpireFromSendQueueWhenPartiallySent) {