dcsctp: Track open channels accurately

In rare cases, it is possible to queue a call to SendData from the signaling
thread on a channel being closed or already closed in the network thread.
By keeping track of currently open streams, we avoid sending messages
with a stream id of channels that the other side already considers closed
and has already reused for a new channel.
This caused rare messages to be delivered on the wrong data channel if
a message was quickly sent, channel closed and a new one reopened.

Bug: webrtc:14277
Change-Id: If35fed8d12d5d2c18cdc6601085d8b632c37a0ba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272624
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37880}
This commit is contained in:
Florent Castelli
2022-08-22 17:46:39 +00:00
committed by WebRTC LUCI CQ
parent 64c70a260e
commit dbc2ba2026
3 changed files with 129 additions and 21 deletions

View File

@ -218,16 +218,17 @@ bool DcSctpTransport::Start(int local_sctp_port,
} }
bool DcSctpTransport::OpenStream(int sid) { bool DcSctpTransport::OpenStream(int sid) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ")."; RTC_DLOG(LS_INFO) << debug_name_ << "->OpenStream(" << sid << ").";
if (!socket_) {
RTC_LOG(LS_ERROR) << debug_name_ << "->OpenStream(sid=" << sid StreamState stream_state;
<< "): Transport is not started."; stream_states_.insert_or_assign(dcsctp::StreamID(static_cast<uint16_t>(sid)),
return false; stream_state);
}
return true; return true;
} }
bool DcSctpTransport::ResetStream(int sid) { bool DcSctpTransport::ResetStream(int sid) {
RTC_DCHECK_RUN_ON(network_thread_);
RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ")."; RTC_DLOG(LS_INFO) << debug_name_ << "->ResetStream(" << sid << ").";
if (!socket_) { if (!socket_) {
RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
@ -237,14 +238,21 @@ bool DcSctpTransport::ResetStream(int sid) {
dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))}; dcsctp::StreamID streams[1] = {dcsctp::StreamID(static_cast<uint16_t>(sid))};
StreamClosingState& closing_state = closing_states_[streams[0]]; auto it = stream_states_.find(streams[0]);
if (closing_state.closure_initiated || closing_state.incoming_reset_done || if (it == stream_states_.end()) {
closing_state.outgoing_reset_done) { RTC_LOG(LS_ERROR) << debug_name_ << "->ResetStream(sid=" << sid
<< "): Stream is not open.";
return false;
}
StreamState& stream_state = it->second;
if (stream_state.closure_initiated || stream_state.incoming_reset_done ||
stream_state.outgoing_reset_done) {
// The closing procedure was already initiated by the remote, don't do // The closing procedure was already initiated by the remote, don't do
// anything. // anything.
return false; return false;
} }
closing_state.closure_initiated = true; stream_state.closure_initiated = true;
socket_->ResetStreams(streams); socket_->ResetStreams(streams);
return true; return true;
} }
@ -265,6 +273,30 @@ bool DcSctpTransport::SendData(int sid,
return false; return false;
} }
// It is possible for a message to be sent from the signaling thread at the
// same time a data-channel is closing, but before the signaling thread is
// aware of it. So we need to keep track of currently active data channels and
// skip sending messages for the ones that are not open or closing.
// The sending errors are not impacting the data channel API contract as
// it is allowed to discard queued messages when the channel is closing.
auto stream_state =
stream_states_.find(dcsctp::StreamID(static_cast<uint16_t>(sid)));
if (stream_state == stream_states_.end()) {
RTC_LOG(LS_VERBOSE) << "Skipping message on non-open stream with sid: "
<< sid;
*result = cricket::SDR_ERROR;
return false;
}
if (stream_state->second.closure_initiated ||
stream_state->second.incoming_reset_done ||
stream_state->second.outgoing_reset_done) {
RTC_LOG(LS_VERBOSE) << "Skipping message on closing stream with sid: "
<< sid;
*result = cricket::SDR_ERROR;
return false;
}
auto max_message_size = socket_->options().max_message_size; auto max_message_size = socket_->options().max_message_size;
if (max_message_size > 0 && payload.size() > max_message_size) { if (max_message_size > 0 && payload.size() > max_message_size) {
RTC_LOG(LS_WARNING) << debug_name_ RTC_LOG(LS_WARNING) << debug_name_
@ -519,16 +551,23 @@ void DcSctpTransport::OnStreamsResetPerformed(
RTC_LOG(LS_INFO) << debug_name_ RTC_LOG(LS_INFO) << debug_name_
<< "->OnStreamsResetPerformed(...): Outgoing stream reset" << "->OnStreamsResetPerformed(...): Outgoing stream reset"
<< ", sid=" << stream_id.value(); << ", sid=" << stream_id.value();
StreamClosingState& closing_state = closing_states_[stream_id];
closing_state.outgoing_reset_done = true;
if (closing_state.incoming_reset_done) { auto it = stream_states_.find(stream_id);
if (it == stream_states_.end()) {
// Ignoring an outgoing stream reset for a closed stream
return;
}
StreamState& stream_state = it->second;
stream_state.outgoing_reset_done = true;
if (stream_state.incoming_reset_done) {
// When the close was not initiated locally, we can signal the end of the // When the close was not initiated locally, we can signal the end of the
// data channel close procedure when the remote ACKs the reset. // data channel close procedure when the remote ACKs the reset.
if (data_channel_sink_) { if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value()); data_channel_sink_->OnChannelClosed(stream_id.value());
} }
closing_states_.erase(stream_id); stream_states_.erase(stream_id);
} }
} }
} }
@ -540,10 +579,15 @@ void DcSctpTransport::OnIncomingStreamsReset(
RTC_LOG(LS_INFO) << debug_name_ RTC_LOG(LS_INFO) << debug_name_
<< "->OnIncomingStreamsReset(...): Incoming stream reset" << "->OnIncomingStreamsReset(...): Incoming stream reset"
<< ", sid=" << stream_id.value(); << ", sid=" << stream_id.value();
StreamClosingState& closing_state = closing_states_[stream_id];
closing_state.incoming_reset_done = true;
if (!closing_state.closure_initiated) { auto it = stream_states_.find(stream_id);
if (it == stream_states_.end())
return;
StreamState& stream_state = it->second;
stream_state.incoming_reset_done = true;
if (!stream_state.closure_initiated) {
// When receiving an incoming stream reset event for a non local close // When receiving an incoming stream reset event for a non local close
// procedure, the transport needs to reset the stream in the other // procedure, the transport needs to reset the stream in the other
// direction too. // direction too.
@ -554,13 +598,13 @@ void DcSctpTransport::OnIncomingStreamsReset(
} }
} }
if (closing_state.outgoing_reset_done) { if (stream_state.outgoing_reset_done) {
// The close procedure that was initiated locally is complete when we // The close procedure that was initiated locally is complete when we
// receive and incoming reset event. // receive and incoming reset event.
if (data_channel_sink_) { if (data_channel_sink_) {
data_channel_sink_->OnChannelClosed(stream_id.value()); data_channel_sink_->OnChannelClosed(stream_id.value());
} }
closing_states_.erase(stream_id); stream_states_.erase(stream_id);
} }
} }
} }

View File

@ -114,10 +114,10 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
std::string debug_name_ = "DcSctpTransport"; std::string debug_name_ = "DcSctpTransport";
rtc::CopyOnWriteBuffer receive_buffer_; rtc::CopyOnWriteBuffer receive_buffer_;
// Used to keep track of the closing state of the data channel. // Used to keep track of the state of data channels.
// Reset needs to happen both ways before signaling the transport // Reset needs to happen both ways before signaling the transport
// is closed. // is closed.
struct StreamClosingState { struct StreamState {
// True when the local connection has initiated the reset. // True when the local connection has initiated the reset.
// If a connection receives a reset for a stream that isn't // If a connection receives a reset for a stream that isn't
// already being reset locally, it needs to fire the signal // already being reset locally, it needs to fire the signal
@ -129,7 +129,9 @@ class DcSctpTransport : public cricket::SctpTransportInternal,
bool outgoing_reset_done = false; bool outgoing_reset_done = false;
}; };
flat_map<dcsctp::StreamID, StreamClosingState> closing_states_; // Map of all currently open or closing data channels
flat_map<dcsctp::StreamID, StreamState> stream_states_
RTC_GUARDED_BY(network_thread_);
bool ready_to_send_data_ = false; bool ready_to_send_data_ = false;
std::function<void()> on_connected_callback_ RTC_GUARDED_BY(network_thread_); std::function<void()> on_connected_callback_ RTC_GUARDED_BY(network_thread_);
DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr; DataChannelSink* data_channel_sink_ RTC_GUARDED_BY(network_thread_) = nullptr;

View File

@ -18,6 +18,7 @@
#include "p2p/base/fake_packet_transport.h" #include "p2p/base/fake_packet_transport.h"
#include "test/gtest.h" #include "test/gtest.h"
using ::testing::_;
using ::testing::ByMove; using ::testing::ByMove;
using ::testing::DoAll; using ::testing::DoAll;
using ::testing::ElementsAre; using ::testing::ElementsAre;
@ -25,6 +26,7 @@ using ::testing::InSequence;
using ::testing::Invoke; using ::testing::Invoke;
using ::testing::NiceMock; using ::testing::NiceMock;
using ::testing::Return; using ::testing::Return;
using ::testing::ReturnPointee;
namespace webrtc { namespace webrtc {
@ -112,6 +114,7 @@ TEST(DcSctpTransportTest, CloseSequence) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->OpenStream(1);
peer_b.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->ResetStream(1); peer_a.sctp_transport_->ResetStream(1);
// Simulate the callbacks from the stream resets // Simulate the callbacks from the stream resets
@ -153,6 +156,7 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024); peer_b.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->OpenStream(1); peer_a.sctp_transport_->OpenStream(1);
peer_b.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->ResetStream(1); peer_a.sctp_transport_->ResetStream(1);
peer_b.sctp_transport_->ResetStream(1); peer_b.sctp_transport_->ResetStream(1);
@ -168,4 +172,62 @@ TEST(DcSctpTransportTest, CloseSequenceSimultaneous) {
->OnIncomingStreamsReset(streams); ->OnIncomingStreamsReset(streams);
} }
TEST(DcSctpTransportTest, DiscardMessageClosedChannel) {
rtc::AutoThread main_thread;
Peer peer_a;
EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0);
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_FALSE(send_data_return);
EXPECT_EQ(cricket::SDR_ERROR, result);
}
TEST(DcSctpTransportTest, DiscardMessageClosingChannel) {
rtc::AutoThread main_thread;
Peer peer_a;
EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(0);
peer_a.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
peer_a.sctp_transport_->ResetStream(1);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_FALSE(send_data_return);
EXPECT_EQ(cricket::SDR_ERROR, result);
}
TEST(DcSctpTransportTest, SendDataOpenChannel) {
rtc::AutoThread main_thread;
Peer peer_a;
dcsctp::DcSctpOptions options;
EXPECT_CALL(*peer_a.socket_, Send(_, _)).Times(1);
EXPECT_CALL(*peer_a.socket_, options()).WillOnce(ReturnPointee(&options));
peer_a.sctp_transport_->OpenStream(1);
peer_a.sctp_transport_->Start(5000, 5000, 256 * 1024);
cricket::SendDataResult result;
SendDataParams params;
rtc::CopyOnWriteBuffer payload;
bool send_data_return =
peer_a.sctp_transport_->SendData(1, params, payload, &result);
EXPECT_TRUE(send_data_return);
EXPECT_EQ(cricket::SDR_SUCCESS, result);
}
} // namespace webrtc } // namespace webrtc