diff --git a/net/dcsctp/tx/mock_send_queue.h b/net/dcsctp/tx/mock_send_queue.h index 54f5fd275d..8c19262c25 100644 --- a/net/dcsctp/tx/mock_send_queue.h +++ b/net/dcsctp/tx/mock_send_queue.h @@ -31,7 +31,7 @@ class MockSendQueue : public SendQueue { Produce, (TimeMs now, size_t max_size), (override)); - MOCK_METHOD(void, + MOCK_METHOD(bool, Discard, (IsUnordered unordered, StreamID stream_id, MID message_id), (override)); diff --git a/net/dcsctp/tx/retransmission_queue.cc b/net/dcsctp/tx/retransmission_queue.cc index 17d73583d1..1253ff744b 100644 --- a/net/dcsctp/tx/retransmission_queue.cc +++ b/net/dcsctp/tx/retransmission_queue.cc @@ -739,8 +739,35 @@ void RetransmissionQueue::ExpireChunks(TimeMs now) { void RetransmissionQueue::ExpireAllFor( const RetransmissionQueue::TxData& item) { // Erase all remaining chunks from the producer, if any. - send_queue_.Discard(item.data().is_unordered, item.data().stream_id, - item.data().message_id); + if (send_queue_.Discard(item.data().is_unordered, item.data().stream_id, + item.data().message_id)) { + // There were remaining chunks to be produced for this message. Since the + // receiver may have already received all chunks (up till now) for this + // message, we can't just FORWARD-TSN to the last fragment in this + // (abandoned) message and start sending a new message, as the receiver will + // then see a new message before the end of the previous one was seen (or + // skipped over). So create a new fragment, representing the end, that the + // received will never see as it is abandoned immediately and used as cum + // TSN in the sent FORWARD-TSN. + UnwrappedTSN tsn = next_tsn_; + next_tsn_.Increment(); + Data message_end(item.data().stream_id, item.data().ssn, + item.data().message_id, item.data().fsn, item.data().ppid, + std::vector(), Data::IsBeginning(false), + Data::IsEnd(true), item.data().is_unordered); + TxData& added_item = + outstanding_data_ + .emplace(tsn, RetransmissionQueue::TxData(std::move(message_end), + absl::nullopt, TimeMs(0), + absl::nullopt)) + .first->second; + // The added chunk shouldn't be included in `outstanding_bytes`, so set it + // as acked. + added_item.Ack(); + RTC_DLOG(LS_VERBOSE) << log_prefix_ + << "Adding unsent end placeholder for message at tsn=" + << *tsn.Wrap(); + } for (auto& elem : outstanding_data_) { UnwrappedTSN tsn = elem.first; TxData& other = elem.second; diff --git a/net/dcsctp/tx/retransmission_queue_test.cc b/net/dcsctp/tx/retransmission_queue_test.cc index f7368d1c96..48bbcbc68a 100644 --- a/net/dcsctp/tx/retransmission_queue_test.cc +++ b/net/dcsctp/tx/retransmission_queue_test.cc @@ -42,6 +42,7 @@ using ::testing::ElementsAre; using ::testing::IsEmpty; using ::testing::NiceMock; using ::testing::Pair; +using ::testing::Return; using ::testing::SizeIs; using ::testing::UnorderedElementsAre; @@ -530,9 +531,68 @@ TEST_F(RetransmissionQueueTest, ProducesValidForwardTsn) { Pair(TSN(12), State::kToBeRetransmitted))); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + .WillOnce(Return(true)); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + // NOTE: The TSN=13 represents the end fragment. + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kAbandoned), // + Pair(TSN(12), State::kAbandoned), // + Pair(TSN(13), State::kAbandoned))); + + ForwardTsnChunk forward_tsn = queue.CreateForwardTsn(); + EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(13)); + EXPECT_THAT(forward_tsn.skipped_streams(), + UnorderedElementsAre( + ForwardTsnChunk::SkippedStream(StreamID(1), SSN(42)))); +} + +TEST_F(RetransmissionQueueTest, ProducesValidForwardTsnWhenFullySent) { + RetransmissionQueue queue = CreateQueue(); + EXPECT_CALL(producer_, Produce) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({1, 2, 3, 4}, "B")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({5, 6, 7, 8}, "")); + dts.max_retransmissions = 0; + return dts; + }) + .WillOnce([this](TimeMs, size_t) { + SendQueue::DataToSend dts(gen_.Ordered({9, 10, 11, 12}, "E")); + dts.max_retransmissions = 0; + return dts; + }) + .WillRepeatedly([](TimeMs, size_t) { return absl::nullopt; }); + + // Send and ack first chunk (TSN 10) + std::vector> chunks_to_send = + queue.GetChunksToSend(now_, 1000); + EXPECT_THAT(chunks_to_send, ElementsAre(Pair(TSN(10), _), Pair(TSN(11), _), + Pair(TSN(12), _))); + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(9), State::kAcked), // + Pair(TSN(10), State::kInFlight), // + Pair(TSN(11), State::kInFlight), // + Pair(TSN(12), State::kInFlight))); + + // Chunk 10 is acked, but the remaining are lost + queue.HandleSack(now_, SackChunk(TSN(10), kArwnd, {}, {})); + queue.HandleT3RtxTimerExpiry(); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(10), State::kAcked), // + Pair(TSN(11), State::kToBeRetransmitted), // + Pair(TSN(12), State::kToBeRetransmitted))); + + EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) + .WillOnce(Return(false)); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + // NOTE: No additional TSN representing the end fragment, as that's TSN=12. EXPECT_THAT(queue.GetChunkStatesForTesting(), ElementsAre(Pair(TSN(10), State::kAcked), // Pair(TSN(11), State::kAbandoned), // @@ -609,11 +669,11 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { Pair(TSN(13), State::kAcked))); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(1), MID(42))) - .Times(1); + .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(IsUnordered(true), StreamID(2), MID(42))) - .Times(1); + .WillOnce(Return(true)); EXPECT_CALL(producer_, Discard(IsUnordered(false), StreamID(3), MID(42))) - .Times(1); + .WillOnce(Return(true)); EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); EXPECT_THAT(queue.GetChunkStatesForTesting(), @@ -621,12 +681,45 @@ TEST_F(RetransmissionQueueTest, ProducesValidIForwardTsn) { Pair(TSN(10), State::kAbandoned), // Pair(TSN(11), State::kAbandoned), // Pair(TSN(12), State::kAbandoned), // - Pair(TSN(13), State::kAcked))); + Pair(TSN(13), State::kAcked), + // Representing end fragments of stream 1-3 + Pair(TSN(14), State::kAbandoned), // + Pair(TSN(15), State::kAbandoned), // + Pair(TSN(16), State::kAbandoned))); - IForwardTsnChunk forward_tsn = queue.CreateIForwardTsn(); - EXPECT_EQ(forward_tsn.new_cumulative_tsn(), TSN(12)); + IForwardTsnChunk forward_tsn1 = queue.CreateIForwardTsn(); + EXPECT_EQ(forward_tsn1.new_cumulative_tsn(), TSN(12)); EXPECT_THAT( - forward_tsn.skipped_streams(), + forward_tsn1.skipped_streams(), + UnorderedElementsAre(IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(1), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(true), StreamID(2), MID(42)), + IForwardTsnChunk::SkippedStream( + IsUnordered(false), StreamID(3), MID(42)))); + + // When TSN 13 is acked, the placeholder "end fragments" must be skipped as + // well. + + // A receiver is more likely to ack TSN 13, but do it incrementally. + queue.HandleSack(now_, SackChunk(TSN(12), kArwnd, {}, {})); + + EXPECT_CALL(producer_, Discard).Times(0); + EXPECT_FALSE(queue.ShouldSendForwardTsn(now_)); + + queue.HandleSack(now_, SackChunk(TSN(13), kArwnd, {}, {})); + EXPECT_TRUE(queue.ShouldSendForwardTsn(now_)); + + EXPECT_THAT(queue.GetChunkStatesForTesting(), + ElementsAre(Pair(TSN(13), State::kAcked), // + Pair(TSN(14), State::kAbandoned), // + Pair(TSN(15), State::kAbandoned), // + Pair(TSN(16), State::kAbandoned))); + + IForwardTsnChunk forward_tsn2 = queue.CreateIForwardTsn(); + EXPECT_EQ(forward_tsn2.new_cumulative_tsn(), TSN(16)); + EXPECT_THAT( + forward_tsn2.skipped_streams(), UnorderedElementsAre(IForwardTsnChunk::SkippedStream( IsUnordered(false), StreamID(1), MID(42)), IForwardTsnChunk::SkippedStream( diff --git a/net/dcsctp/tx/rr_send_queue.cc b/net/dcsctp/tx/rr_send_queue.cc index 7f913393c8..c7303221b0 100644 --- a/net/dcsctp/tx/rr_send_queue.cc +++ b/net/dcsctp/tx/rr_send_queue.cc @@ -135,15 +135,18 @@ size_t RRSendQueue::OutgoingStream::buffered_amount() const { return bytes; } -void RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, +bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered, MID message_id) { if (!items_.empty()) { Item& item = items_.front(); if (item.send_options.unordered == unordered && item.message_id.has_value() && *item.message_id == message_id) { items_.pop_front(); + // As the item still existed, it had unsent data. + return true; } } + return false; } void RRSendQueue::OutgoingStream::Pause() { @@ -262,10 +265,10 @@ absl::optional RRSendQueue::Produce(TimeMs now, return absl::nullopt; } -void RRSendQueue::Discard(IsUnordered unordered, +bool RRSendQueue::Discard(IsUnordered unordered, StreamID stream_id, MID message_id) { - GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); + return GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id); } void RRSendQueue::PrepareResetStreams(rtc::ArrayView streams) { diff --git a/net/dcsctp/tx/rr_send_queue.h b/net/dcsctp/tx/rr_send_queue.h index abbe70205d..2b9389f68f 100644 --- a/net/dcsctp/tx/rr_send_queue.h +++ b/net/dcsctp/tx/rr_send_queue.h @@ -64,7 +64,7 @@ class RRSendQueue : public SendQueue { // Implementation of `SendQueue`. absl::optional Produce(TimeMs now, size_t max_size) override; - void Discard(IsUnordered unordered, + bool Discard(IsUnordered unordered, StreamID stream_id, MID message_id) override; void PrepareResetStreams(rtc::ArrayView streams) override; @@ -92,7 +92,7 @@ class RRSendQueue : public SendQueue { size_t buffered_amount() const; // Discards a partially sent message, see `SendQueue::Discard`. - void Discard(IsUnordered unordered, MID message_id); + bool Discard(IsUnordered unordered, MID message_id); // Pauses this stream, which is used before resetting it. void Pause(); diff --git a/net/dcsctp/tx/send_queue.h b/net/dcsctp/tx/send_queue.h index bb5aab2df8..f48c0c75a6 100644 --- a/net/dcsctp/tx/send_queue.h +++ b/net/dcsctp/tx/send_queue.h @@ -60,7 +60,10 @@ class SendQueue { // receiver that any partially received message fragments should be skipped. // This means that any remaining fragments in the Send Queue must be removed // as well so that they are not sent. - virtual void Discard(IsUnordered unordered, + // + // This function returns true if this message had unsent fragments still in + // the queue that were discarded, and false if there were no such fragments. + virtual bool Discard(IsUnordered unordered, StreamID stream_id, MID message_id) = 0;