From 4f6d233dccdd7a40f686e6ad4b4799cb8637aaab Mon Sep 17 00:00:00 2001 From: Jeroen de Borst Date: Wed, 18 Jul 2018 11:25:12 -0700 Subject: [PATCH] Added explicit EOR to sctp messages and coalesce messages on the receiving side. TBR=pthatcher@webrtc.org Bug: webrtc:7774 Change-Id: I41d1cd98d1e7b2ad479177eb2e328a5e2c704824 Reviewed-on: https://webrtc-review.googlesource.com/88900 Commit-Queue: Jeroen de Borst Reviewed-by: Qingsi Wang Reviewed-by: Steve Anton Cr-Commit-Position: refs/heads/master@{#24031} --- media/sctp/sctptransport.cc | 40 +++++++++++++++-- media/sctp/sctptransport.h | 6 +++ pc/peerconnectionendtoend_unittest.cc | 63 ++++++++++++++++++++++++--- rtc_base/copyonwritebuffer.cc | 3 ++ rtc_base/copyonwritebuffer.h | 3 ++ 5 files changed, 105 insertions(+), 10 deletions(-) diff --git a/media/sctp/sctptransport.cc b/media/sctp/sctptransport.cc index 967e315be5..eaaf63fb3a 100644 --- a/media/sctp/sctptransport.cc +++ b/media/sctp/sctptransport.cc @@ -291,22 +291,44 @@ class SctpTransport::UsrSctpWrapper { // It's neither a notification nor a recognized data packet. Drop it. RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid << " on an SCTP packet. Dropping."; + free(data); } else { - rtc::CopyOnWriteBuffer buffer; ReceiveDataParams params; - buffer.SetData(reinterpret_cast(data), length); + + // Expect only continuation messages belonging to the same sid, usrsctp + // ensures this. + RTC_CHECK(transport->partial_message_.size() == 0 || + rcv.rcv_sid == transport->partial_message_sid_); + + transport->partial_message_.AppendData(reinterpret_cast(data), + length); + transport->partial_message_sid_ = rcv.rcv_sid; + + free(data); + + // Merge partial messages until they exceed the maximum send buffer size. + // This enables messages from a single send to be delivered in a single + // callback. Larger messages (originating from other implementations) will + // still be delivered in chunks. + if (!(flags & MSG_EOR) && + (transport->partial_message_.size() < kSendBufferSize)) { + return 1; + } + params.sid = rcv.rcv_sid; params.seq_num = rcv.rcv_ssn; params.timestamp = rcv.rcv_tsn; params.type = type; + // The ownership of the packet transfers to |invoker_|. Using // CopyOnWriteBuffer is the most convenient way to do this. transport->invoker_.AsyncInvoke( RTC_FROM_HERE, transport->network_thread_, rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport, - transport, buffer, params, flags)); + transport, transport->partial_message_, params, flags)); + + transport->partial_message_.Clear(); } - free(data); return 1; } @@ -489,6 +511,7 @@ bool SctpTransport::SendData(const SendDataParams& params, spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID; spa.sendv_sndinfo.snd_sid = params.sid; spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type)); + spa.sendv_sndinfo.snd_flags |= SCTP_EOR; // Ordered implies reliable. if (!params.ordered) { @@ -694,6 +717,15 @@ bool SctpTransport::ConfigureSctpSocket() { return false; } + // Explicit EOR. + uint32_t eor = 1; + if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &eor, + sizeof(eor))) { + RTC_LOG_ERRNO(LS_ERROR) << debug_name_ << "->ConfigureSctpSocket(): " + << "Failed to set SCTP_EXPLICIT_EOR."; + return false; + } + // Subscribe to SCTP event notifications. int event_types[] = {SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE, SCTP_SEND_FAILED_EVENT, SCTP_SENDER_DRY_EVENT, diff --git a/media/sctp/sctptransport.h b/media/sctp/sctptransport.h index b2d7084d9e..aad060ad9d 100644 --- a/media/sctp/sctptransport.h +++ b/media/sctp/sctptransport.h @@ -142,6 +142,12 @@ class SctpTransport : public SctpTransportInternal, rtc::AsyncInvoker invoker_; // Underlying DTLS channel. rtc::PacketTransportInternal* transport_ = nullptr; + + // Track the data received from usrsctp between callbacks until the EOR bit + // arrives. + rtc::CopyOnWriteBuffer partial_message_; + int partial_message_sid_; + bool was_ever_writable_ = false; int local_port_ = kSctpDefaultPort; int remote_port_ = kSctpDefaultPort; diff --git a/pc/peerconnectionendtoend_unittest.cc b/pc/peerconnectionendtoend_unittest.cc index 54d967f713..7a8c2dbc12 100644 --- a/pc/peerconnectionendtoend_unittest.cc +++ b/pc/peerconnectionendtoend_unittest.cc @@ -47,7 +47,7 @@ using webrtc::SdpSemantics; namespace { -const int kMaxWait = 10000; +const int kMaxWait = 25000; } // namespace @@ -139,23 +139,40 @@ class PeerConnectionEndToEndBaseTest : public sigslot::has_slots<>, // Tests that |dc1| and |dc2| can send to and receive from each other. void TestDataChannelSendAndReceive(DataChannelInterface* dc1, - DataChannelInterface* dc2) { + DataChannelInterface* dc2, + size_t size = 6) { std::unique_ptr dc1_observer( new webrtc::MockDataChannelObserver(dc1)); std::unique_ptr dc2_observer( new webrtc::MockDataChannelObserver(dc2)); - static const std::string kDummyData = "abcdefg"; - webrtc::DataBuffer buffer(kDummyData); + static const std::string kDummyData = + "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; + webrtc::DataBuffer buffer(""); + + size_t sizeLeft = size; + while (sizeLeft > 0) { + size_t chunkSize = + sizeLeft > kDummyData.length() ? kDummyData.length() : sizeLeft; + buffer.data.AppendData(kDummyData.data(), chunkSize); + sizeLeft -= chunkSize; + } + EXPECT_TRUE(dc1->Send(buffer)); - EXPECT_EQ_WAIT(kDummyData, dc2_observer->last_message(), kMaxWait); + EXPECT_EQ_WAIT(buffer.data, + rtc::CopyOnWriteBuffer(dc2_observer->last_message()), + kMaxWait); EXPECT_TRUE(dc2->Send(buffer)); - EXPECT_EQ_WAIT(kDummyData, dc1_observer->last_message(), kMaxWait); + EXPECT_EQ_WAIT(buffer.data, + rtc::CopyOnWriteBuffer(dc1_observer->last_message()), + kMaxWait); EXPECT_EQ(1U, dc1_observer->received_message_count()); + EXPECT_EQ(size, dc1_observer->last_message().length()); EXPECT_EQ(1U, dc2_observer->received_message_count()); + EXPECT_EQ(size, dc2_observer->last_message().length()); } void WaitForDataChannelsToOpen(DataChannelInterface* local_dc, @@ -524,6 +541,40 @@ TEST_P(PeerConnectionEndToEndTest, CreateDataChannelAfterNegotiate) { CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0); } +// Verifies that a DataChannel created can transfer large messages. +TEST_P(PeerConnectionEndToEndTest, CreateDataChannelLargeTransfer) { + CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(), + webrtc::MockAudioDecoderFactory::CreateEmptyFactory()); + + webrtc::DataChannelInit init; + + // This DataChannel is for creating the data content in the negotiation. + rtc::scoped_refptr dummy( + caller_->CreateDataChannel("data", init)); + Negotiate(); + WaitForConnection(); + + // Wait for the data channel created pre-negotiation to be opened. + WaitForDataChannelsToOpen(dummy, callee_signaled_data_channels_, 0); + + // Create new DataChannels after the negotiation and verify their states. + rtc::scoped_refptr caller_dc( + caller_->CreateDataChannel("hello", init)); + rtc::scoped_refptr callee_dc( + callee_->CreateDataChannel("hello", init)); + + WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1); + WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0); + + TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1], + 256 * 1024); + TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0], + 256 * 1024); + + CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1); + CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0); +} + // Verifies that DataChannel IDs are even/odd based on the DTLS roles. TEST_P(PeerConnectionEndToEndTest, DataChannelIdAssignment) { CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(), diff --git a/rtc_base/copyonwritebuffer.cc b/rtc_base/copyonwritebuffer.cc index 8874ea9187..6c48d52f17 100644 --- a/rtc_base/copyonwritebuffer.cc +++ b/rtc_base/copyonwritebuffer.cc @@ -22,6 +22,9 @@ CopyOnWriteBuffer::CopyOnWriteBuffer(const CopyOnWriteBuffer& buf) CopyOnWriteBuffer::CopyOnWriteBuffer(CopyOnWriteBuffer&& buf) : buffer_(std::move(buf.buffer_)) {} +CopyOnWriteBuffer::CopyOnWriteBuffer(const std::string& s) + : CopyOnWriteBuffer(s.data(), s.length()) {} + CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size) : buffer_(size > 0 ? new RefCountedObject(size) : nullptr) { RTC_DCHECK(IsConsistent()); diff --git a/rtc_base/copyonwritebuffer.h b/rtc_base/copyonwritebuffer.h index 467baad878..0514e2fe66 100644 --- a/rtc_base/copyonwritebuffer.h +++ b/rtc_base/copyonwritebuffer.h @@ -31,6 +31,9 @@ class CopyOnWriteBuffer { // Move contents from an existing buffer. CopyOnWriteBuffer(CopyOnWriteBuffer&& buf); + // Construct a buffer from a string, convenient for unittests. + CopyOnWriteBuffer(const std::string& s); + // Construct a buffer with the specified number of uninitialized bytes. explicit CopyOnWriteBuffer(size_t size); CopyOnWriteBuffer(size_t size, size_t capacity);