dcsctp: Allocate TSN for end of abandoned message

If a not fully sent message is abandoned, there must be a TSN
representing the end of that message (even if that fragment is never
sent), as the receiver can otherwise reject the next sent message as it
hasn't seen any end of the previous one.

A long explanation can be found at
https://github.com/sctplab/usrsctp/issues/592#issuecomment-849047689

Bug: webrtc:12812
Change-Id: I09c571bd6dd2774b0c147d4e5ddac67d2aa64fea
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/220361
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34140}
This commit is contained in:
Victor Boivie
2021-05-26 23:46:41 +02:00
committed by WebRTC LUCI CQ
parent 9700d88b1a
commit 7d2e669a38
6 changed files with 143 additions and 17 deletions

View File

@ -31,7 +31,7 @@ class MockSendQueue : public SendQueue {
Produce,
(TimeMs now, size_t max_size),
(override));
MOCK_METHOD(void,
MOCK_METHOD(bool,
Discard,
(IsUnordered unordered, StreamID stream_id, MID message_id),
(override));

View File

@ -739,8 +739,35 @@ void RetransmissionQueue::ExpireChunks(TimeMs now) {
void RetransmissionQueue::ExpireAllFor(
const RetransmissionQueue::TxData& item) {
// Erase all remaining chunks from the producer, if any.
send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
item.data().message_id);
if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id,
item.data().message_id)) {
// There were remaining chunks to be produced for this message. Since the
// receiver may have already received all chunks (up till now) for this
// message, we can't just FORWARD-TSN to the last fragment in this
// (abandoned) message and start sending a new message, as the receiver will
// then see a new message before the end of the previous one was seen (or
// skipped over). So create a new fragment, representing the end, that the
// received will never see as it is abandoned immediately and used as cum
// TSN in the sent FORWARD-TSN.
UnwrappedTSN tsn = next_tsn_;
next_tsn_.Increment();
Data message_end(item.data().stream_id, item.data().ssn,
item.data().message_id, item.data().fsn, item.data().ppid,
std::vector<uint8_t>(), Data::IsBeginning(false),
Data::IsEnd(true), item.data().is_unordered);
TxData& added_item =
outstanding_data_
.emplace(tsn, RetransmissionQueue::TxData(std::move(message_end),
absl::nullopt, TimeMs(0),
absl::nullopt))
.first->second;
// The added chunk shouldn't be included in `outstanding_bytes`, so set it
// as acked.
added_item.Ack();
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "Adding unsent end placeholder for message at tsn="
<< *tsn.Wrap();
}
for (auto& elem : outstanding_data_) {
UnwrappedTSN tsn = elem.first;
TxData& other = elem.second;

View File

@ -42,6 +42,7 @@ using ::testing::ElementsAre;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::Pair;
using ::testing::Return;
using ::testing::SizeIs;
using ::testing::UnorderedElementsAre;
@ -530,9 +531,68 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) {
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
// NOTE: The TSN=13 represents the end fragment.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAbandoned)));
ForwardTsnChunk forward_tsn = queue.CreateForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13));
EXPECT_THAT(forward_tsn.skipped_streams(),
UnorderedElementsAre(
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42))));
}
TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) {
RetransmissionQueue queue = CreateQueue();
EXPECT_CALL(producer_, Produce)
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B"));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, ""));
dts.max_retransmissions = 0;
return dts;
})
.WillOnce([this](TimeMs, size_t) {
SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E"));
dts.max_retransmissions = 0;
return dts;
})
.WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; });
// Send and ack first chunk (TSN 10)
std::vector<std::pair<TSN, Data>> chunks_to_send =
queue.GetChunksToSend(now_, 1000);
EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _),
Pair(TSN(12), _)));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(9), State::kAcked), //
Pair(TSN(10), State::kInFlight), //
Pair(TSN(11), State::kInFlight), //
Pair(TSN(12), State::kInFlight)));
// Chunk 10 is acked, but the remaining are lost
queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {}));
queue.HandleT3RtxTimerExpiry();
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kToBeRetransmitted), //
Pair(TSN(12), State::kToBeRetransmitted)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.WillOnce(Return(false));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
// NOTE: No additional TSN representing the end fragment, as that's TSN=12.
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(10), State::kAcked), //
Pair(TSN(11), State::kAbandoned), //
@ -609,11 +669,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
Pair(TSN(13), State::kAcked)));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42)))
.Times(1);
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42)))
.Times(1);
.WillOnce(Return(true));
EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42)))
.Times(1);
.WillOnce(Return(true));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
@ -621,12 +681,45 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) {
Pair(TSN(10), State::kAbandoned), //
Pair(TSN(11), State::kAbandoned), //
Pair(TSN(12), State::kAbandoned), //
Pair(TSN(13), State::kAcked)));
Pair(TSN(13), State::kAcked),
// Representing end fragments of stream 1-3
Pair(TSN(14), State::kAbandoned), //
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
IForwardTsnChunk forward_tsn = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12));
IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12));
EXPECT_THAT(
forward_tsn.skipped_streams(),
forward_tsn1.skipped_streams(),
UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(1), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(true), StreamID(2), MID(42)),
IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(3), MID(42))));
// When TSN 13 is acked, the placeholder "end fragments" must be skipped as
// well.
// A receiver is more likely to ack TSN 13, but do it incrementally.
queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {}));
EXPECT_CALL(producer_, Discard).Times(0);
EXPECT_FALSE(queue.ShouldSendForwardTsn(now_));
queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {}));
EXPECT_TRUE(queue.ShouldSendForwardTsn(now_));
EXPECT_THAT(queue.GetChunkStatesForTesting(),
ElementsAre(Pair(TSN(13), State::kAcked), //
Pair(TSN(14), State::kAbandoned), //
Pair(TSN(15), State::kAbandoned), //
Pair(TSN(16), State::kAbandoned)));
IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn();
EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16));
EXPECT_THAT(
forward_tsn2.skipped_streams(),
UnorderedElementsAre(IForwardTsnChunk::SkippedStream(
IsUnordered(false), StreamID(1), MID(42)),
IForwardTsnChunk::SkippedStream(

View File

@ -135,15 +135,18 @@ size_t RRSendQueue::OutgoingStream::buffered_amount() const {
return bytes;
}
void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
MID message_id) {
if (!items_.empty()) {
Item& item = items_.front();
if (item.send_options.unordered == unordered &&
item.message_id.has_value() && *item.message_id == message_id) {
items_.pop_front();
// As the item still existed, it had unsent data.
return true;
}
}
return false;
}
void RRSendQueue::OutgoingStream::Pause() {
@ -262,10 +265,10 @@ absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
return absl::nullopt;
}
void RRSendQueue::Discard(IsUnordered unordered,
bool RRSendQueue::Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) {
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
return GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
}
void RRSendQueue::PrepareResetStreams(rtc::ArrayView<const StreamID> streams) {

View File

@ -64,7 +64,7 @@ class RRSendQueue : public SendQueue {
// Implementation of `SendQueue`.
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
void Discard(IsUnordered unordered,
bool Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) override;
void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) override;
@ -92,7 +92,7 @@ class RRSendQueue : public SendQueue {
size_t buffered_amount() const;
// Discards a partially sent message, see `SendQueue::Discard`.
void Discard(IsUnordered unordered, MID message_id);
bool Discard(IsUnordered unordered, MID message_id);
// Pauses this stream, which is used before resetting it.
void Pause();

View File

@ -60,7 +60,10 @@ class SendQueue {
// receiver that any partially received message fragments should be skipped.
// This means that any remaining fragments in the Send Queue must be removed
// as well so that they are not sent.
virtual void Discard(IsUnordered unordered,
//
// This function returns true if this message had unsent fragments still in
// the queue that were discarded, and false if there were no such fragments.
virtual bool Discard(IsUnordered unordered,
StreamID stream_id,
MID message_id) = 0;