dcsctp: Report acked/abandoned messages when acked

For all messages where the last fragment was _not_ put on the wire, the
send queue is responsible for generating lifecycle events, but once all
fragments have been put on the wire, it's the retransmission queue that
is responsible. It does that by marking the final fragment of a message
with the lifecycle identifier, and once that message has been fully
acked by the cumulative ack TSN, it's clear that the peer has fully
received all fragments of the message, as the last fragment was acked.

For abandoned messages - where FORWARD-TSNs are sent, those will be
replied to by a SACK as well, and then we report abandoned messages
separately, to later trigger `OnLifecycleMessageExpired`.

This CL adds support in OutstandingData, which doesn't generate the
callbacks itself, but just reports them in the AckInfo.

Bug: webrtc:5696
Change-Id: I64092f13bcfda685443b7df9967b04d54aedd36a
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264124
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37448}
This commit is contained in:
Victor Boivie
2022-05-27 14:30:24 +02:00
committed by WebRTC LUCI CQ
parent 72b12d42a6
commit 4a93da315b
3 changed files with 139 additions and 8 deletions

View File

@ -31,7 +31,9 @@ size_t OutstandingData::GetSerializedChunkSize(const Data& data) const {
}
void OutstandingData::Item::Ack() {
if (lifecycle_ != Lifecycle::kAbandoned) {
lifecycle_ = Lifecycle::kActive;
}
ack_state_ = AckState::kAcked;
}
@ -145,6 +147,14 @@ void OutstandingData::RemoveAcked(UnwrappedTSN cumulative_tsn_ack,
for (auto iter = outstanding_data_.begin(); iter != first_unacked; ++iter) {
AckChunk(ack_info, iter);
if (iter->second.lifecycle_id().IsSet()) {
RTC_DCHECK(iter->second.data().is_end);
if (iter->second.is_abandoned()) {
ack_info.abandoned_lifecycle_ids.push_back(iter->second.lifecycle_id());
} else {
ack_info.acked_lifecycle_ids.push_back(iter->second.lifecycle_id());
}
}
}
outstanding_data_.erase(outstanding_data_.begin(), first_unacked);
@ -268,7 +278,8 @@ void OutstandingData::AbandonAllFor(const Item& item) {
.emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
std::forward_as_tuple(std::move(message_end), TimeMs(0),
MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture()))
TimeMs::InfiniteFuture(),
LifecycleId::NotSet()))
.first->second;
// The added chunk shouldn't be included in `outstanding_bytes`, so set it
// as acked.
@ -384,7 +395,8 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
const Data& data,
TimeMs time_sent,
MaxRetransmits max_retransmissions,
TimeMs expires_at) {
TimeMs expires_at,
LifecycleId lifecycle_id) {
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();
@ -395,7 +407,8 @@ absl::optional<UnwrappedTSN> OutstandingData::Insert(
auto it = outstanding_data_
.emplace(std::piecewise_construct, std::forward_as_tuple(tsn),
std::forward_as_tuple(data.Clone(), time_sent,
max_retransmissions, expires_at))
max_retransmissions, expires_at,
lifecycle_id))
.first;
if (it->second.has_expired(time_sent)) {

View File

@ -63,6 +63,11 @@ class OutstandingData {
// Highest TSN Newly Acknowledged, an SCTP variable.
UnwrappedTSN highest_tsn_acked;
// The set of lifecycle IDs that were acked using cumulative_tsn_ack.
std::vector<LifecycleId> acked_lifecycle_ids;
// The set of lifecycle IDs that were acked, but had been abandoned.
std::vector<LifecycleId> abandoned_lifecycle_ids;
};
OutstandingData(
@ -125,7 +130,8 @@ class OutstandingData {
const Data& data,
TimeMs time_sent,
MaxRetransmits max_retransmissions = MaxRetransmits::NoLimit(),
TimeMs expires_at = TimeMs::InfiniteFuture());
TimeMs expires_at = TimeMs::InfiniteFuture(),
LifecycleId lifecycle_id = LifecycleId::NotSet());
// Nacks all outstanding data.
void NackAll();
@ -167,10 +173,12 @@ class OutstandingData {
Item(Data data,
TimeMs time_sent,
MaxRetransmits max_retransmissions,
TimeMs expires_at)
TimeMs expires_at,
LifecycleId lifecycle_id)
: time_sent_(time_sent),
max_retransmissions_(max_retransmissions),
expires_at_(expires_at),
lifecycle_id_(lifecycle_id),
data_(std::move(data)) {}
Item(const Item&) = delete;
@ -212,6 +220,8 @@ class OutstandingData {
// indicate if it has expired (SCTP Partial Reliability Extension).
bool has_expired(TimeMs now) const;
LifecycleId lifecycle_id() const { return lifecycle_id_; }
private:
enum class Lifecycle : uint8_t {
// The chunk is alive (sent, received, etc)
@ -240,8 +250,6 @@ class OutstandingData {
// set to that number. The value zero (0) means that it will never be
// retransmitted.
const MaxRetransmits max_retransmissions_;
// At this exact millisecond, the item is considered expired. If the message
// is not to be expired, this is set to the infinite future.
// Indicates the life cycle status of this chunk.
Lifecycle lifecycle_ = Lifecycle::kActive;
@ -255,8 +263,13 @@ class OutstandingData {
// The number of times the DATA chunk has been retransmitted.
uint16_t num_retransmissions_ = 0;
// At this exact millisecond, the item is considered expired. If the message
// is not to be expired, this is set to the infinite future.
const TimeMs expires_at_;
// An optional lifecycle id, which may only be set for the last fragment.
const LifecycleId lifecycle_id_;
// The actual data to send/retransmit.
const Data data_;
};

View File

@ -482,5 +482,110 @@ TEST_F(OutstandingDataTest, CanAbandonChunksMarkedForFastRetransmit) {
EXPECT_THAT(buf_.GetChunksToBeFastRetransmitted(1000), IsEmpty());
EXPECT_THAT(buf_.GetChunksToBeRetransmitted(1000), IsEmpty());
}
TEST_F(OutstandingDataTest, LifecyleReturnsAckedItemsInAckInfo) {
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(42));
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(43));
buf_.Insert(gen_.Ordered({1}, "BE"), kNow, MaxRetransmits::NoLimit(),
TimeMs::InfiniteFuture(), LifecycleId(44));
OutstandingData::AckInfo ack1 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(11)), {}, false);
EXPECT_THAT(ack1.acked_lifecycle_ids,
ElementsAre(LifecycleId(42), LifecycleId(43)));
OutstandingData::AckInfo ack2 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(12)), {}, false);
EXPECT_THAT(ack2.acked_lifecycle_ids, ElementsAre(LifecycleId(44)));
}
TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedNackedThreeTimes) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
TimeMs::InfiniteFuture(), LifecycleId(42));
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 2)};
EXPECT_FALSE(
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
std::vector<SackChunk::GapAckBlock> gab2 = {SackChunk::GapAckBlock(2, 3)};
EXPECT_FALSE(
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab2, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
std::vector<SackChunk::GapAckBlock> gab3 = {SackChunk::GapAckBlock(2, 4)};
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
OutstandingData::AckInfo ack1 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab3, false);
EXPECT_TRUE(ack1.has_packet_loss);
EXPECT_THAT(ack1.abandoned_lifecycle_ids, IsEmpty());
// This will generate a FORWARD-TSN, which is acked
EXPECT_TRUE(buf_.ShouldSendForwardTsn());
ForwardTsnChunk chunk = buf_.CreateForwardTsn();
EXPECT_EQ(chunk.new_cumulative_tsn(), TSN(13));
OutstandingData::AckInfo ack2 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(13)), {}, false);
EXPECT_FALSE(ack2.has_packet_loss);
EXPECT_THAT(ack2.abandoned_lifecycle_ids, ElementsAre(LifecycleId(42)));
}
TEST_F(OutstandingDataTest, LifecycleReturnsAbandonedAfterT3rtxExpired) {
buf_.Insert(gen_.Ordered({1}, "B"), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, ""), kNow, MaxRetransmits(0));
buf_.Insert(gen_.Ordered({1}, "E"), kNow, MaxRetransmits(0),
TimeMs::InfiniteFuture(), LifecycleId(42));
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight), //
Pair(TSN(13), State::kInFlight)));
std::vector<SackChunk::GapAckBlock> gab1 = {SackChunk::GapAckBlock(2, 4)};
EXPECT_FALSE(
buf_.HandleSack(unwrapper_.Unwrap(TSN(9)), gab1, false).has_packet_loss);
EXPECT_FALSE(buf_.has_data_to_be_retransmitted());
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kNacked), //
Pair(TSN(11), State::kAcked), //
Pair(TSN(12), State::kAcked), //
Pair(TSN(13), State::kAcked)));
// T3-rtx triggered.
EXPECT_CALL(on_discard_, Call(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
buf_.NackAll();
EXPECT_THAT(buf_.GetChunkStatesForTesting(),
testing::ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAbandoned)));
// This will generate a FORWARD-TSN, which is acked
EXPECT_TRUE(buf_.ShouldSendForwardTsn());
ForwardTsnChunk chunk = buf_.CreateForwardTsn();
EXPECT_EQ(chunk.new_cumulative_tsn(), TSN(13));
OutstandingData::AckInfo ack2 =
buf_.HandleSack(unwrapper_.Unwrap(TSN(13)), {}, false);
EXPECT_FALSE(ack2.has_packet_loss);
EXPECT_THAT(ack2.abandoned_lifecycle_ids, ElementsAre(LifecycleId(42)));
}
} // namespace
} // namespace dcsctp