usrsctp: Support sending and receiving empty messages

Add new PPIDs 56 and 57. When sending an empty message,
we use the corresponding PPID with a single byte data chunk.
On the receiving side, when detecting such a PPID, we just
ignore the payload content.

Bug: webrtc:12697
Change-Id: I6af481e7281db10d9663e1c0aaf97b3e608432a1
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/215931
Commit-Queue: Florent Castelli <orphis@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33808}
This commit is contained in:
Florent Castelli
2021-04-22 13:32:39 +02:00
committed by Commit Bot
parent 9bd2457857
commit 88f4b33196
5 changed files with 117 additions and 36 deletions

View File

@ -74,24 +74,25 @@ static constexpr size_t kSctpMtu = 1191;
ABSL_CONST_INIT int g_usrsctp_usage_count = 0; ABSL_CONST_INIT int g_usrsctp_usage_count = 0;
ABSL_CONST_INIT bool g_usrsctp_initialized_ = false; ABSL_CONST_INIT bool g_usrsctp_initialized_ = false;
ABSL_CONST_INIT webrtc::GlobalMutex g_usrsctp_lock_(absl::kConstInit); ABSL_CONST_INIT webrtc::GlobalMutex g_usrsctp_lock_(absl::kConstInit);
ABSL_CONST_INIT char kZero[] = {'\0'};
// DataMessageType is used for the SCTP "Payload Protocol Identifier", as // DataMessageType is used for the SCTP "Payload Protocol Identifier", as
// defined in http://tools.ietf.org/html/rfc4960#section-14.4 // defined in http://tools.ietf.org/html/rfc4960#section-14.4
// //
// For the list of IANA approved values see: // For the list of IANA approved values see:
// https://tools.ietf.org/html/rfc8831 Sec. 8
// http://www.iana.org/assignments/sctp-parameters/sctp-parameters.xml // http://www.iana.org/assignments/sctp-parameters/sctp-parameters.xml
// The value is not used by SCTP itself. It indicates the protocol running // The value is not used by SCTP itself. It indicates the protocol running
// on top of SCTP. // on top of SCTP.
enum { enum {
PPID_NONE = 0, // No protocol is specified. PPID_NONE = 0, // No protocol is specified.
// Matches the PPIDs in mozilla source and
// https://datatracker.ietf.org/doc/draft-ietf-rtcweb-data-protocol Sec. 9
// They're not yet assigned by IANA.
PPID_CONTROL = 50, PPID_CONTROL = 50,
PPID_BINARY_PARTIAL = 52, PPID_TEXT_LAST = 51,
PPID_BINARY_PARTIAL = 52, // Deprecated
PPID_BINARY_LAST = 53, PPID_BINARY_LAST = 53,
PPID_TEXT_PARTIAL = 54, PPID_TEXT_PARTIAL = 54, // Deprecated
PPID_TEXT_LAST = 51 PPID_TEXT_EMPTY = 56,
PPID_BINARY_EMPTY = 57,
}; };
// Should only be modified by UsrSctpWrapper. // Should only be modified by UsrSctpWrapper.
@ -128,7 +129,7 @@ void DebugSctpPrintf(const char* format, ...) {
} }
// Get the PPID to use for the terminating fragment of this type. // Get the PPID to use for the terminating fragment of this type.
uint32_t GetPpid(cricket::DataMessageType type) { uint32_t GetPpid(cricket::DataMessageType type, size_t size) {
switch (type) { switch (type) {
default: default:
case cricket::DMT_NONE: case cricket::DMT_NONE:
@ -136,9 +137,9 @@ uint32_t GetPpid(cricket::DataMessageType type) {
case cricket::DMT_CONTROL: case cricket::DMT_CONTROL:
return PPID_CONTROL; return PPID_CONTROL;
case cricket::DMT_BINARY: case cricket::DMT_BINARY:
return PPID_BINARY_LAST; return size > 0 ? PPID_BINARY_LAST : PPID_BINARY_EMPTY;
case cricket::DMT_TEXT: case cricket::DMT_TEXT:
return PPID_TEXT_LAST; return size > 0 ? PPID_TEXT_LAST : PPID_TEXT_EMPTY;
} }
} }
@ -147,11 +148,13 @@ bool GetDataMediaType(uint32_t ppid, cricket::DataMessageType* dest) {
switch (ppid) { switch (ppid) {
case PPID_BINARY_PARTIAL: case PPID_BINARY_PARTIAL:
case PPID_BINARY_LAST: case PPID_BINARY_LAST:
case PPID_BINARY_EMPTY:
*dest = cricket::DMT_BINARY; *dest = cricket::DMT_BINARY;
return true; return true;
case PPID_TEXT_PARTIAL: case PPID_TEXT_PARTIAL:
case PPID_TEXT_LAST: case PPID_TEXT_LAST:
case PPID_TEXT_EMPTY:
*dest = cricket::DMT_TEXT; *dest = cricket::DMT_TEXT;
return true; return true;
@ -168,6 +171,10 @@ bool GetDataMediaType(uint32_t ppid, cricket::DataMessageType* dest) {
} }
} }
bool IsEmptyPPID(uint32_t ppid) {
return ppid == PPID_BINARY_EMPTY || ppid == PPID_TEXT_EMPTY;
}
// Log the packet in text2pcap format, if log level is at LS_VERBOSE. // Log the packet in text2pcap format, if log level is at LS_VERBOSE.
// //
// In order to turn these logs into a pcap file you can use, first filter the // In order to turn these logs into a pcap file you can use, first filter the
@ -205,11 +212,12 @@ void VerboseLogPacket(const void* data, size_t length, int direction) {
// Creates the sctp_sendv_spa struct used for setting flags in the // Creates the sctp_sendv_spa struct used for setting flags in the
// sctp_sendv() call. // sctp_sendv() call.
sctp_sendv_spa CreateSctpSendParams(const cricket::SendDataParams& params) { sctp_sendv_spa CreateSctpSendParams(const cricket::SendDataParams& params,
size_t size) {
struct sctp_sendv_spa spa = {0}; struct sctp_sendv_spa spa = {0};
spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID; spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
spa.sendv_sndinfo.snd_sid = params.sid; spa.sendv_sndinfo.snd_sid = params.sid;
spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type)); spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type, size));
// Explicitly marking the EOR flag turns the usrsctp_sendv call below into a // Explicitly marking the EOR flag turns the usrsctp_sendv call below into a
// non atomic operation. This means that the sctp lib might only accept the // non atomic operation. This means that the sctp lib might only accept the
// message partially. This is done in order to improve throughput, so that we // message partially. This is done in order to improve throughput, so that we
@ -792,13 +800,23 @@ SendDataResult UsrsctpTransport::SendMessageInternal(OutgoingMessage* message) {
} }
// Send data using SCTP. // Send data using SCTP.
sctp_sendv_spa spa = CreateSctpSendParams(message->send_params()); sctp_sendv_spa spa =
CreateSctpSendParams(message->send_params(), message->size());
const void* data = message->data();
size_t data_length = message->size();
if (message->size() == 0) {
// Empty messages are replaced by a single NUL byte on the wire as SCTP
// doesn't support empty messages.
// The PPID carries the information that the payload needs to be ignored.
data = kZero;
data_length = 1;
}
// Note: this send call is not atomic because the EOR bit is set. This means // Note: this send call is not atomic because the EOR bit is set. This means
// that usrsctp can partially accept this message and it is our duty to buffer // that usrsctp can partially accept this message and it is our duty to buffer
// the rest. // the rest.
ssize_t send_res = usrsctp_sendv( ssize_t send_res = usrsctp_sendv(sock_, data, data_length, NULL, 0, &spa,
sock_, message->data(), message->size(), NULL, 0, &spa, rtc::checked_cast<socklen_t>(sizeof(spa)),
rtc::checked_cast<socklen_t>(sizeof(spa)), SCTP_SENDV_SPA, 0); SCTP_SENDV_SPA, 0);
if (send_res < 0) { if (send_res < 0) {
if (errno == SCTP_EWOULDBLOCK) { if (errno == SCTP_EWOULDBLOCK) {
ready_to_send_data_ = false; ready_to_send_data_ = false;
@ -814,7 +832,8 @@ SendDataResult UsrsctpTransport::SendMessageInternal(OutgoingMessage* message) {
} }
size_t amount_sent = static_cast<size_t>(send_res); size_t amount_sent = static_cast<size_t>(send_res);
RTC_DCHECK_LE(amount_sent, message->size()); RTC_DCHECK_LE(amount_sent, data_length);
if (message->size() != 0)
message->Advance(amount_sent); message->Advance(amount_sent);
// Only way out now is success. // Only way out now is success.
return SDR_SUCCESS; return SDR_SUCCESS;
@ -1319,7 +1338,10 @@ void UsrsctpTransport::OnDataOrNotificationFromSctp(const void* data,
// association. // association.
params.seq_num = rcv.rcv_ssn; params.seq_num = rcv.rcv_ssn;
// Append the chunk's data to the message buffer // Append the chunk's data to the message buffer unless we have a chunk with a
// PPID marking an empty message.
// See: https://tools.ietf.org/html/rfc8831#section-6.6
if (!IsEmptyPPID(ppid))
partial_incoming_message_.AppendData(reinterpret_cast<const uint8_t*>(data), partial_incoming_message_.AppendData(reinterpret_cast<const uint8_t*>(data),
length); length);
partial_params_ = params; partial_params_ = params;

View File

@ -218,6 +218,52 @@ TEST_P(DataChannelIntegrationTest,
} }
} }
// This test sets up a call between two parties with an SCTP
// data channel only, and sends empty messages
TEST_P(DataChannelIntegrationTest,
EndToEndCallWithSctpDataChannelEmptyMessages) {
ASSERT_TRUE(CreatePeerConnectionWrappers());
ConnectFakeSignaling();
// Expect that data channel created on caller side will show up for callee as
// well.
caller()->CreateDataChannel();
caller()->CreateAndSetAndSignalOffer();
ASSERT_TRUE_WAIT(SignalingStateStable(), kDefaultTimeout);
// Caller data channel should already exist (it created one). Callee data
// channel may not exist yet, since negotiation happens in-band, not in SDP.
ASSERT_NE(nullptr, caller()->data_channel());
ASSERT_TRUE_WAIT(callee()->data_channel() != nullptr, kDefaultTimeout);
EXPECT_TRUE_WAIT(caller()->data_observer()->IsOpen(), kDefaultTimeout);
EXPECT_TRUE_WAIT(callee()->data_observer()->IsOpen(), kDefaultTimeout);
// Ensure data can be sent in both directions.
// Sending empty string data
std::string data = "";
caller()->data_channel()->Send(DataBuffer(data));
EXPECT_EQ_WAIT(1u, callee()->data_observer()->received_message_count(),
kDefaultTimeout);
EXPECT_TRUE(callee()->data_observer()->last_message().empty());
EXPECT_FALSE(callee()->data_observer()->messages().back().binary);
callee()->data_channel()->Send(DataBuffer(data));
EXPECT_EQ_WAIT(1u, caller()->data_observer()->received_message_count(),
kDefaultTimeout);
EXPECT_TRUE(caller()->data_observer()->last_message().empty());
EXPECT_FALSE(caller()->data_observer()->messages().back().binary);
// Sending empty binary data
rtc::CopyOnWriteBuffer empty_buffer;
caller()->data_channel()->Send(DataBuffer(empty_buffer, true));
EXPECT_EQ_WAIT(2u, callee()->data_observer()->received_message_count(),
kDefaultTimeout);
EXPECT_TRUE(callee()->data_observer()->last_message().empty());
EXPECT_TRUE(callee()->data_observer()->messages().back().binary);
callee()->data_channel()->Send(DataBuffer(empty_buffer, true));
EXPECT_EQ_WAIT(2u, caller()->data_observer()->received_message_count(),
kDefaultTimeout);
EXPECT_TRUE(caller()->data_observer()->last_message().empty());
EXPECT_TRUE(caller()->data_observer()->messages().back().binary);
}
TEST_P(DataChannelIntegrationTest, TEST_P(DataChannelIntegrationTest,
EndToEndCallWithSctpDataChannelLowestSafeMtu) { EndToEndCallWithSctpDataChannelLowestSafeMtu) {
// The lowest payload size limit that's tested and found safe for this // The lowest payload size limit that's tested and found safe for this
@ -386,10 +432,16 @@ TEST_P(DataChannelIntegrationTest, StressTestUnorderedSctpDataChannel) {
kDefaultTimeout); kDefaultTimeout);
// Sort and compare to make sure none of the messages were corrupted. // Sort and compare to make sure none of the messages were corrupted.
std::vector<std::string> caller_received_messages = std::vector<std::string> caller_received_messages;
caller()->data_observer()->messages(); absl::c_transform(caller()->data_observer()->messages(),
std::vector<std::string> callee_received_messages = std::back_inserter(caller_received_messages),
callee()->data_observer()->messages(); [](const auto& a) { return a.data; });
std::vector<std::string> callee_received_messages;
absl::c_transform(callee()->data_observer()->messages(),
std::back_inserter(callee_received_messages),
[](const auto& a) { return a.data; });
absl::c_sort(sent_messages); absl::c_sort(sent_messages);
absl::c_sort(caller_received_messages); absl::c_sort(caller_received_messages);
absl::c_sort(callee_received_messages); absl::c_sort(callee_received_messages);

View File

@ -294,13 +294,6 @@ bool SctpDataChannel::Send(const DataBuffer& buffer) {
return false; return false;
} }
// TODO(jiayl): the spec is unclear about if the remote side should get the
// onmessage event. We need to figure out the expected behavior and change the
// code accordingly.
if (buffer.size() == 0) {
return true;
}
buffered_amount_ += buffer.size(); buffered_amount_ += buffer.size();
// If the queue is non-empty, we're waiting for SignalReadyToSend, // If the queue is non-empty, we're waiting for SignalReadyToSend,

View File

@ -36,7 +36,7 @@ class FakeDataChannelProvider
return false; return false;
} }
if (transport_error_ || payload.size() == 0) { if (transport_error_) {
*result = cricket::SDR_ERROR; *result = cricket::SDR_ERROR;
return false; return false;
} }

View File

@ -351,6 +351,11 @@ class FakeSetRemoteDescriptionObserver
class MockDataChannelObserver : public webrtc::DataChannelObserver { class MockDataChannelObserver : public webrtc::DataChannelObserver {
public: public:
struct Message {
std::string data;
bool binary;
};
explicit MockDataChannelObserver(webrtc::DataChannelInterface* channel) explicit MockDataChannelObserver(webrtc::DataChannelInterface* channel)
: channel_(channel) { : channel_(channel) {
channel_->RegisterObserver(this); channel_->RegisterObserver(this);
@ -363,20 +368,29 @@ class MockDataChannelObserver : public webrtc::DataChannelObserver {
void OnStateChange() override { state_ = channel_->state(); } void OnStateChange() override { state_ = channel_->state(); }
void OnMessage(const DataBuffer& buffer) override { void OnMessage(const DataBuffer& buffer) override {
messages_.push_back( messages_.push_back(
std::string(buffer.data.data<char>(), buffer.data.size())); {std::string(buffer.data.data<char>(), buffer.data.size()),
buffer.binary});
} }
bool IsOpen() const { return state_ == DataChannelInterface::kOpen; } bool IsOpen() const { return state_ == DataChannelInterface::kOpen; }
std::vector<std::string> messages() const { return messages_; } std::vector<Message> messages() const { return messages_; }
std::string last_message() const { std::string last_message() const {
return messages_.empty() ? std::string() : messages_.back(); if (messages_.empty())
return {};
return messages_.back().data;
}
bool last_message_is_binary() const {
if (messages_.empty())
return false;
return messages_.back().binary;
} }
size_t received_message_count() const { return messages_.size(); } size_t received_message_count() const { return messages_.size(); }
private: private:
rtc::scoped_refptr<webrtc::DataChannelInterface> channel_; rtc::scoped_refptr<webrtc::DataChannelInterface> channel_;
DataChannelInterface::DataState state_; DataChannelInterface::DataState state_;
std::vector<std::string> messages_; std::vector<Message> messages_;
}; };
class MockStatsObserver : public webrtc::StatsObserver { class MockStatsObserver : public webrtc::StatsObserver {