dssctp: support socket handover in StreamResetHandler

Bug: webrtc:13154
Change-Id: Idafbed4f3c1af8d0cca833ba983c4b4b99118335
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/232121
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Commit-Queue: Sergey Sukhanov <sergeysu@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#35012}
This commit is contained in:
Sergey Sukhanov
2021-09-16 07:27:47 +02:00
committed by WebRTC LUCI CQ
parent 17b7a68bd2
commit 3b08fe3dcd
4 changed files with 267 additions and 83 deletions

View File

@ -46,6 +46,7 @@ struct DcSctpSocketHandoverState {
uint32_t last_cumulative_acked_tsn = 0;
uint32_t last_assembled_tsn = 0;
uint32_t last_completed_deferred_reset_req_sn = 0;
uint32_t last_completed_reset_req_sn = 0;
std::vector<OrderedStream> ordered_streams;
std::vector<UnorderedStream> unordered_streams;
};

View File

@ -343,4 +343,20 @@ absl::optional<DurationMs> StreamResetHandler::OnReconfigTimerExpiry() {
return ctx_->current_rto();
}
HandoverReadinessStatus StreamResetHandler::GetHandoverReadiness() const {
HandoverReadinessStatus status;
if (!streams_to_reset_.empty()) {
status.Add(HandoverUnreadinessReason::kPendingStreamReset);
}
if (current_request_.has_value()) {
status.Add(HandoverUnreadinessReason::kPendingStreamResetRequest);
}
return status;
}
void StreamResetHandler::AddHandoverState(DcSctpSocketHandoverState& state) {
state.rx.last_completed_reset_req_sn = last_processed_req_seq_nbr_.value();
state.tx.next_reset_req_sn = next_outgoing_req_seq_nbr_.value();
}
} // namespace dcsctp

View File

@ -70,7 +70,8 @@ class StreamResetHandler {
TimerManager* timer_manager,
DataTracker* data_tracker,
ReassemblyQueue* reassembly_queue,
RetransmissionQueue* retransmission_queue)
RetransmissionQueue* retransmission_queue,
const DcSctpSocketHandoverState* handover_state = nullptr)
: log_prefix_(std::string(log_prefix) + "reset: "),
ctx_(context),
data_tracker_(data_tracker),
@ -80,9 +81,15 @@ class StreamResetHandler {
"re-config",
absl::bind_front(&StreamResetHandler::OnReconfigTimerExpiry, this),
TimerOptions(DurationMs(0)))),
next_outgoing_req_seq_nbr_(ReconfigRequestSN(*ctx_->my_initial_tsn())),
next_outgoing_req_seq_nbr_(
handover_state
? ReconfigRequestSN(handover_state->tx.next_reset_req_sn)
: ReconfigRequestSN(*ctx_->my_initial_tsn())),
last_processed_req_seq_nbr_(
ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1)) {}
handover_state ? ReconfigRequestSN(
handover_state->rx.last_completed_reset_req_sn)
: ReconfigRequestSN(*ctx_->peer_initial_tsn() - 1)) {
}
// Initiates reset of the provided streams. While there can only be one
// ongoing stream reset request at any time, this method can be called at any
@ -100,6 +107,10 @@ class StreamResetHandler {
// Called when handling and incoming RE-CONFIG chunk.
void HandleReConfig(ReConfigChunk chunk);
HandoverReadinessStatus GetHandoverReadiness() const;
void AddHandoverState(DcSctpSocketHandoverState& state);
private:
// Represents a stream request operation. There can only be one ongoing at
// any time, and a sent request may either succeed, fail or result in the

View File

@ -38,7 +38,6 @@
namespace dcsctp {
namespace {
using ::testing::_;
using ::testing::IsEmpty;
using ::testing::NiceMock;
using ::testing::Return;
@ -96,9 +95,13 @@ class StreamResetHandlerTest : public testing::Test {
"test/t3_rtx",
[]() { return absl::nullopt; },
TimerOptions(DurationMs(0)))),
buf_("log: ", delayed_ack_timer_.get(), kPeerInitialTsn),
reasm_("log: ", kPeerInitialTsn, kArwnd),
retransmission_queue_(
data_tracker_(std::make_unique<DataTracker>("log: ",
delayed_ack_timer_.get(),
kPeerInitialTsn)),
reasm_(std::make_unique<ReassemblyQueue>("log: ",
kPeerInitialTsn,
kArwnd)),
retransmission_queue_(std::make_unique<RetransmissionQueue>(
"",
kMyInitialTsn,
kArwnd,
@ -106,13 +109,14 @@ class StreamResetHandlerTest : public testing::Test {
[](DurationMs rtt_ms) {},
[]() {},
*t3_rtx_timer_,
/*options=*/{}),
handler_("log: ",
&ctx_,
&timer_manager_,
&buf_,
&reasm_,
&retransmission_queue_) {
DcSctpOptions())),
handler_(
std::make_unique<StreamResetHandler>("log: ",
&ctx_,
&timer_manager_,
data_tracker_.get(),
reasm_.get(),
retransmission_queue_.get())) {
EXPECT_CALL(ctx_, current_rto).WillRepeatedly(Return(kRto));
}
@ -131,7 +135,7 @@ class StreamResetHandlerTest : public testing::Test {
// that are sent in the response RE-CONFIG.
std::vector<ReconfigurationResponseParameter> HandleAndCatchResponse(
ReConfigChunk chunk) {
handler_.HandleReConfig(std::move(chunk));
handler_->HandleReConfig(std::move(chunk));
std::vector<uint8_t> payload = callbacks_.ConsumeSentPacket();
if (payload.empty()) {
@ -169,6 +173,33 @@ class StreamResetHandlerTest : public testing::Test {
return responses;
}
void PerformHandover() {
EXPECT_TRUE(handler_->GetHandoverReadiness().IsReady());
EXPECT_TRUE(data_tracker_->GetHandoverReadiness().IsReady());
EXPECT_TRUE(reasm_->GetHandoverReadiness().IsReady());
EXPECT_TRUE(retransmission_queue_->GetHandoverReadiness().IsReady());
DcSctpSocketHandoverState state;
handler_->AddHandoverState(state);
data_tracker_->AddHandoverState(state);
reasm_->AddHandoverState(state);
retransmission_queue_->AddHandoverState(state);
data_tracker_ = std::make_unique<DataTracker>(
"log: ", delayed_ack_timer_.get(), kPeerInitialTsn, &state);
reasm_ = std::make_unique<ReassemblyQueue>("log: ", kPeerInitialTsn, kArwnd,
&state);
retransmission_queue_ = std::make_unique<RetransmissionQueue>(
"", kMyInitialTsn, kArwnd, producer_, [](DurationMs rtt_ms) {}, []() {},
*t3_rtx_timer_, DcSctpOptions(),
/*supports_partial_reliability=*/true,
/*use_message_interleaving=*/false, &state);
handler_ = std::make_unique<StreamResetHandler>(
"log: ", &ctx_, &timer_manager_, data_tracker_.get(), reasm_.get(),
retransmission_queue_.get(), &state);
}
DataGenerator gen_;
NiceMock<MockDcSctpSocketCallbacks> callbacks_;
NiceMock<MockContext> ctx_;
@ -176,16 +207,16 @@ class StreamResetHandlerTest : public testing::Test {
TimerManager timer_manager_;
std::unique_ptr<Timer> delayed_ack_timer_;
std::unique_ptr<Timer> t3_rtx_timer_;
DataTracker buf_;
ReassemblyQueue reasm_;
RetransmissionQueue retransmission_queue_;
StreamResetHandler handler_;
std::unique_ptr<DataTracker> data_tracker_;
std::unique_ptr<ReassemblyQueue> reasm_;
std::unique_ptr<RetransmissionQueue> retransmission_queue_;
std::unique_ptr<StreamResetHandler> handler_;
};
TEST_F(StreamResetHandlerTest, ChunkWithNoParametersReturnsError) {
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
EXPECT_CALL(callbacks_, OnError).Times(1);
handler_.HandleReConfig(ReConfigChunk(Parameters()));
handler_->HandleReConfig(ReConfigChunk(Parameters()));
}
TEST_F(StreamResetHandlerTest, ChunkWithInvalidParametersReturnsError) {
@ -200,32 +231,32 @@ TEST_F(StreamResetHandlerTest, ChunkWithInvalidParametersReturnsError) {
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
EXPECT_CALL(callbacks_, OnError).Times(1);
handler_.HandleReConfig(ReConfigChunk(builder.Build()));
handler_->HandleReConfig(ReConfigChunk(builder.Build()));
}
TEST_F(StreamResetHandlerTest, FailToDeliverWithoutResettingStream) {
reasm_.Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_.Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
buf_.Observe(kPeerInitialTsn);
buf_.Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_.FlushMessages(),
data_tracker_->Observe(kPeerInitialTsn);
data_tracker_->Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_->FlushMessages(),
UnorderedElementsAre(
SctpMessageIs(StreamID(1), PPID(53), kShortPayload),
SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
gen_.ResetStream();
reasm_.Add(AddTo(kPeerInitialTsn, 2), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm_.FlushMessages(), IsEmpty());
reasm_->Add(AddTo(kPeerInitialTsn, 2), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm_->FlushMessages(), IsEmpty());
}
TEST_F(StreamResetHandlerTest, ResetStreamsNotDeferred) {
reasm_.Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_.Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
buf_.Observe(kPeerInitialTsn);
buf_.Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_.FlushMessages(),
data_tracker_->Observe(kPeerInitialTsn);
data_tracker_->Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_->FlushMessages(),
UnorderedElementsAre(
SctpMessageIs(StreamID(1), PPID(53), kShortPayload),
SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
@ -241,8 +272,8 @@ TEST_F(StreamResetHandlerTest, ResetStreamsNotDeferred) {
EXPECT_EQ(responses[0].result(), ResponseResult::kSuccessPerformed);
gen_.ResetStream();
reasm_.Add(AddTo(kPeerInitialTsn, 2), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm_.FlushMessages(),
reasm_->Add(AddTo(kPeerInitialTsn, 2), gen_.Ordered({1, 2, 3, 4}, "BE"));
EXPECT_THAT(reasm_->FlushMessages(),
UnorderedElementsAre(
SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
}
@ -250,14 +281,15 @@ TEST_F(StreamResetHandlerTest, ResetStreamsNotDeferred) {
TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
DataGeneratorOptions opts;
opts.message_id = MID(0);
reasm_.Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE", opts));
opts.message_id = MID(1);
reasm_.Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->Add(AddTo(kPeerInitialTsn, 1),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
buf_.Observe(kPeerInitialTsn);
buf_.Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_.FlushMessages(),
data_tracker_->Observe(kPeerInitialTsn);
data_tracker_->Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_->FlushMessages(),
UnorderedElementsAre(
SctpMessageIs(StreamID(1), PPID(53), kShortPayload),
SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
@ -274,26 +306,30 @@ TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
opts.message_id = MID(1);
opts.ppid = PPID(5);
reasm_.Add(AddTo(kPeerInitialTsn, 5), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_.MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
reasm_->Add(AddTo(kPeerInitialTsn, 5),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(0);
opts.ppid = PPID(4);
reasm_.Add(AddTo(kPeerInitialTsn, 4), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_.MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
reasm_->Add(AddTo(kPeerInitialTsn, 4),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(3);
opts.ppid = PPID(3);
reasm_.Add(AddTo(kPeerInitialTsn, 3), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_.MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
reasm_->Add(AddTo(kPeerInitialTsn, 3),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 1));
opts.message_id = MID(2);
opts.ppid = PPID(2);
reasm_.Add(AddTo(kPeerInitialTsn, 2), gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_.MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 5));
reasm_->Add(AddTo(kPeerInitialTsn, 2),
gen_.Ordered({1, 2, 3, 4}, "BE", opts));
reasm_->MaybeResetStreamsDeferred(AddTo(kPeerInitialTsn, 5));
EXPECT_THAT(
reasm_.FlushMessages(),
reasm_->FlushMessages(),
UnorderedElementsAre(SctpMessageIs(StreamID(1), PPID(2), kShortPayload),
SctpMessageIs(StreamID(1), PPID(3), kShortPayload),
SctpMessageIs(StreamID(1), PPID(4), kShortPayload),
@ -302,10 +338,10 @@ TEST_F(StreamResetHandlerTest, ResetStreamsDeferred) {
TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
@ -313,19 +349,19 @@ TEST_F(StreamResetHandlerTest, SendOutgoingRequestDirectly) {
EXPECT_EQ(req.request_sequence_number(), kMyInitialReqSn);
EXPECT_EQ(req.sender_last_assigned_tsn(),
TSN(*retransmission_queue_.next_tsn() - 1));
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req.stream_ids(), UnorderedElementsAre(StreamID(42)));
}
TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(3);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_.ResetStreams(
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(
std::vector<StreamID>({StreamID(43), StreamID(44), StreamID(41)}));
handler_.ResetStreams(std::vector<StreamID>({StreamID(42), StreamID(40)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42), StreamID(40)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
@ -333,7 +369,7 @@ TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) {
EXPECT_EQ(req.request_sequence_number(), kMyInitialReqSn);
EXPECT_EQ(req.sender_last_assigned_tsn(),
TSN(*retransmission_queue_.next_tsn() - 1));
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req.stream_ids(),
UnorderedElementsAre(StreamID(40), StreamID(41), StreamID(42),
StreamID(43), StreamID(44)));
@ -341,25 +377,25 @@ TEST_F(StreamResetHandlerTest, ResetMultipleStreamsInOneRequest) {
TEST_F(StreamResetHandlerTest, SendOutgoingRequestDeferred) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams())
.WillOnce(Return(false))
.WillOnce(Return(false))
.WillOnce(Return(true));
EXPECT_FALSE(handler_.MakeStreamResetRequest().has_value());
EXPECT_FALSE(handler_.MakeStreamResetRequest().has_value());
EXPECT_TRUE(handler_.MakeStreamResetRequest().has_value());
EXPECT_FALSE(handler_->MakeStreamResetRequest().has_value());
EXPECT_FALSE(handler_->MakeStreamResetRequest().has_value());
EXPECT_TRUE(handler_->MakeStreamResetRequest().has_value());
}
TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
@ -376,16 +412,16 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResettingOnPositiveResponse) {
// Processing a response shouldn't result in sending anything.
EXPECT_CALL(callbacks_, OnError).Times(0);
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
handler_.HandleReConfig(std::move(response_reconfig));
handler_->HandleReConfig(std::move(response_reconfig));
}
TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
@ -402,18 +438,18 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRollbackOnError) {
// Only requests should result in sending responses.
EXPECT_CALL(callbacks_, OnError).Times(0);
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
handler_.HandleReConfig(std::move(response_reconfig));
handler_->HandleReConfig(std::move(response_reconfig));
}
TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) {
static constexpr StreamID kStreamToReset = StreamID(42);
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({kStreamToReset}));
handler_->ResetStreams(std::vector<StreamID>({kStreamToReset}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig1 = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig1 = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig1.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req1,
@ -431,7 +467,7 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) {
// Processing a response shouldn't result in sending anything.
EXPECT_CALL(callbacks_, OnError).Times(0);
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
handler_.HandleReConfig(std::move(response_reconfig));
handler_->HandleReConfig(std::move(response_reconfig));
// Let some time pass, so that the reconfig timer expires, and retries the
// same request.
@ -458,23 +494,23 @@ TEST_F(StreamResetHandlerTest, SendOutgoingResetRetransmitOnInProgress) {
TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_.ResetStreams(std::vector<StreamID>({StreamID(42)}));
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig1 = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig1 = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig1.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req1,
reconfig1->parameters().get<OutgoingSSNResetRequestParameter>());
EXPECT_EQ(req1.request_sequence_number(), kMyInitialReqSn);
EXPECT_EQ(req1.sender_last_assigned_tsn(),
AddTo(retransmission_queue_.next_tsn(), -1));
AddTo(retransmission_queue_->next_tsn(), -1));
EXPECT_THAT(req1.stream_ids(), UnorderedElementsAre(StreamID(42)));
// Streams reset while the request is in-flight will be queued.
StreamID stream_ids[] = {StreamID(41), StreamID(43)};
handler_.ResetStreams(stream_ids);
EXPECT_EQ(handler_.MakeStreamResetRequest(), absl::nullopt);
handler_->ResetStreams(stream_ids);
EXPECT_EQ(handler_->MakeStreamResetRequest(), absl::nullopt);
Parameters::Builder builder;
builder.Add(ReconfigurationResponseParameter(
@ -487,18 +523,18 @@ TEST_F(StreamResetHandlerTest, ResetWhileRequestIsSentWillQueue) {
// Processing a response shouldn't result in sending anything.
EXPECT_CALL(callbacks_, OnError).Times(0);
EXPECT_CALL(callbacks_, SendPacketWithStatus).Times(0);
handler_.HandleReConfig(std::move(response_reconfig));
handler_->HandleReConfig(std::move(response_reconfig));
// Response has been processed. A new request can be sent.
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig2 = handler_.MakeStreamResetRequest();
absl::optional<ReConfigChunk> reconfig2 = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig2.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req2,
reconfig2->parameters().get<OutgoingSSNResetRequestParameter>());
EXPECT_EQ(req2.request_sequence_number(), AddTo(kMyInitialReqSn, 1));
EXPECT_EQ(req2.sender_last_assigned_tsn(),
TSN(*retransmission_queue_.next_tsn() - 1));
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req2.stream_ids(),
UnorderedElementsAre(StreamID(41), StreamID(43)));
}
@ -516,12 +552,12 @@ TEST_F(StreamResetHandlerTest, SendIncomingResetJustReturnsNothingPerformed) {
}
TEST_F(StreamResetHandlerTest, SendSameRequestTwiceReturnsNothingToDo) {
reasm_.Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_.Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(kPeerInitialTsn, gen_.Ordered({1, 2, 3, 4}, "BE"));
reasm_->Add(AddTo(kPeerInitialTsn, 1), gen_.Ordered({1, 2, 3, 4}, "BE"));
buf_.Observe(kPeerInitialTsn);
buf_.Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_.FlushMessages(),
data_tracker_->Observe(kPeerInitialTsn);
data_tracker_->Observe(AddTo(kPeerInitialTsn, 1));
EXPECT_THAT(reasm_->FlushMessages(),
UnorderedElementsAre(
SctpMessageIs(StreamID(1), PPID(53), kShortPayload),
SctpMessageIs(StreamID(1), PPID(53), kShortPayload)));
@ -546,5 +582,125 @@ TEST_F(StreamResetHandlerTest, SendSameRequestTwiceReturnsNothingToDo) {
EXPECT_THAT(responses2, SizeIs(1));
EXPECT_EQ(responses2[0].result(), ResponseResult::kSuccessNothingToDo);
}
TEST_F(StreamResetHandlerTest,
HandoverIsAllowedOnlyWhenNoStreamIsBeingOrWillBeReset) {
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_EQ(
handler_->GetHandoverReadiness(),
HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value());
EXPECT_EQ(handler_->GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kPendingStreamResetRequest));
// Reset more streams while the request is in-flight.
StreamID stream_ids[] = {StreamID(41), StreamID(43)};
handler_->ResetStreams(stream_ids);
EXPECT_EQ(handler_->GetHandoverReadiness(),
HandoverReadinessStatus()
.Add(HandoverUnreadinessReason::kPendingStreamResetRequest)
.Add(HandoverUnreadinessReason::kPendingStreamReset));
// Processing a response to first request.
EXPECT_CALL(producer_, CommitResetStreams()).Times(1);
handler_->HandleReConfig(
ReConfigChunk(Parameters::Builder()
.Add(ReconfigurationResponseParameter(
kMyInitialReqSn, ResponseResult::kSuccessPerformed))
.Build()));
EXPECT_EQ(
handler_->GetHandoverReadiness(),
HandoverReadinessStatus(HandoverUnreadinessReason::kPendingStreamReset));
// Second request can be sent.
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
ASSERT_TRUE(handler_->MakeStreamResetRequest().has_value());
EXPECT_EQ(handler_->GetHandoverReadiness(),
HandoverReadinessStatus(
HandoverUnreadinessReason::kPendingStreamResetRequest));
// Processing a response to second request.
EXPECT_CALL(producer_, CommitResetStreams()).Times(1);
handler_->HandleReConfig(ReConfigChunk(
Parameters::Builder()
.Add(ReconfigurationResponseParameter(
AddTo(kMyInitialReqSn, 1), ResponseResult::kSuccessPerformed))
.Build()));
// Seconds response has been processed. No pending resets.
EXPECT_TRUE(handler_->GetHandoverReadiness().IsReady());
}
TEST_F(StreamResetHandlerTest, HandoverInInitialState) {
PerformHandover();
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
absl::optional<ReConfigChunk> reconfig = handler_->MakeStreamResetRequest();
ASSERT_TRUE(reconfig.has_value());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
reconfig->parameters().get<OutgoingSSNResetRequestParameter>());
EXPECT_EQ(req.request_sequence_number(), kMyInitialReqSn);
EXPECT_EQ(req.sender_last_assigned_tsn(),
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req.stream_ids(), UnorderedElementsAre(StreamID(42)));
}
TEST_F(StreamResetHandlerTest, HandoverAfterHavingResetOneStream) {
// Reset one stream
{
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_->ResetStreams(std::vector<StreamID>({StreamID(42)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig,
handler_->MakeStreamResetRequest());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
reconfig.parameters().get<OutgoingSSNResetRequestParameter>());
EXPECT_EQ(req.request_sequence_number(), kMyInitialReqSn);
EXPECT_EQ(req.sender_last_assigned_tsn(),
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req.stream_ids(), UnorderedElementsAre(StreamID(42)));
EXPECT_CALL(producer_, CommitResetStreams()).Times(1);
handler_->HandleReConfig(
ReConfigChunk(Parameters::Builder()
.Add(ReconfigurationResponseParameter(
req.request_sequence_number(),
ResponseResult::kSuccessPerformed))
.Build()));
}
PerformHandover();
// Reset another stream after handover
{
EXPECT_CALL(producer_, PrepareResetStreams).Times(1);
handler_->ResetStreams(std::vector<StreamID>({StreamID(43)}));
EXPECT_CALL(producer_, CanResetStreams()).WillOnce(Return(true));
ASSERT_HAS_VALUE_AND_ASSIGN(ReConfigChunk reconfig,
handler_->MakeStreamResetRequest());
ASSERT_HAS_VALUE_AND_ASSIGN(
OutgoingSSNResetRequestParameter req,
reconfig.parameters().get<OutgoingSSNResetRequestParameter>());
EXPECT_EQ(req.request_sequence_number(),
ReconfigRequestSN(kMyInitialReqSn.value() + 1));
EXPECT_EQ(req.sender_last_assigned_tsn(),
TSN(*retransmission_queue_->next_tsn() - 1));
EXPECT_THAT(req.stream_ids(), UnorderedElementsAre(StreamID(43)));
}
}
} // namespace
} // namespace dcsctp