Added explicit EOR to sctp messages and coalesce messages on the receiving side.

TBR=pthatcher@webrtc.org

Bug: webrtc:7774
Change-Id: I41d1cd98d1e7b2ad479177eb2e328a5e2c704824
Reviewed-on: https://webrtc-review.googlesource.com/88900
Commit-Queue: Jeroen de Borst <jeroendb@webrtc.org>
Reviewed-by: Qingsi Wang <qingsi@webrtc.org>
Reviewed-by: Steve Anton <steveanton@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24031}
This commit is contained in:
Jeroen de Borst
2018-07-18 11:25:12 -07:00
committed by Commit Bot
parent 702f4da4b9
commit 4f6d233dcc
5 changed files with 105 additions and 10 deletions

View File

@ -291,22 +291,44 @@ class SctpTransport::UsrSctpWrapper {
// It's neither a notification nor a recognized data packet. Drop it.
RTC_LOG(LS_ERROR) << "Received an unknown PPID " << ppid
<< " on an SCTP packet. Dropping.";
free(data);
} else {
rtc::CopyOnWriteBuffer buffer;
ReceiveDataParams params;
buffer.SetData(reinterpret_cast<uint8_t*>(data), length);
// Expect only continuation messages belonging to the same sid, usrsctp
// ensures this.
RTC_CHECK(transport->partial_message_.size() == 0 ||
rcv.rcv_sid == transport->partial_message_sid_);
transport->partial_message_.AppendData(reinterpret_cast<uint8_t*>(data),
length);
transport->partial_message_sid_ = rcv.rcv_sid;
free(data);
// Merge partial messages until they exceed the maximum send buffer size.
// This enables messages from a single send to be delivered in a single
// callback. Larger messages (originating from other implementations) will
// still be delivered in chunks.
if (!(flags & MSG_EOR) &&
(transport->partial_message_.size() < kSendBufferSize)) {
return 1;
}
params.sid = rcv.rcv_sid;
params.seq_num = rcv.rcv_ssn;
params.timestamp = rcv.rcv_tsn;
params.type = type;
// The ownership of the packet transfers to |invoker_|. Using
// CopyOnWriteBuffer is the most convenient way to do this.
transport->invoker_.AsyncInvoke<void>(
RTC_FROM_HERE, transport->network_thread_,
rtc::Bind(&SctpTransport::OnInboundPacketFromSctpToTransport,
transport, buffer, params, flags));
transport, transport->partial_message_, params, flags));
transport->partial_message_.Clear();
}
free(data);
return 1;
}
@ -489,6 +511,7 @@ bool SctpTransport::SendData(const SendDataParams& params,
spa.sendv_flags |= SCTP_SEND_SNDINFO_VALID;
spa.sendv_sndinfo.snd_sid = params.sid;
spa.sendv_sndinfo.snd_ppid = rtc::HostToNetwork32(GetPpid(params.type));
spa.sendv_sndinfo.snd_flags |= SCTP_EOR;
// Ordered implies reliable.
if (!params.ordered) {
@ -694,6 +717,15 @@ bool SctpTransport::ConfigureSctpSocket() {
return false;
}
// Explicit EOR.
uint32_t eor = 1;
if (usrsctp_setsockopt(sock_, IPPROTO_SCTP, SCTP_EXPLICIT_EOR, &eor,
sizeof(eor))) {
RTC_LOG_ERRNO(LS_ERROR) << debug_name_ << "->ConfigureSctpSocket(): "
<< "Failed to set SCTP_EXPLICIT_EOR.";
return false;
}
// Subscribe to SCTP event notifications.
int event_types[] = {SCTP_ASSOC_CHANGE, SCTP_PEER_ADDR_CHANGE,
SCTP_SEND_FAILED_EVENT, SCTP_SENDER_DRY_EVENT,

View File

@ -142,6 +142,12 @@ class SctpTransport : public SctpTransportInternal,
rtc::AsyncInvoker invoker_;
// Underlying DTLS channel.
rtc::PacketTransportInternal* transport_ = nullptr;
// Track the data received from usrsctp between callbacks until the EOR bit
// arrives.
rtc::CopyOnWriteBuffer partial_message_;
int partial_message_sid_;
bool was_ever_writable_ = false;
int local_port_ = kSctpDefaultPort;
int remote_port_ = kSctpDefaultPort;

View File

@ -47,7 +47,7 @@ using webrtc::SdpSemantics;
namespace {
const int kMaxWait = 10000;
const int kMaxWait = 25000;
} // namespace
@ -139,23 +139,40 @@ class PeerConnectionEndToEndBaseTest : public sigslot::has_slots<>,
// Tests that |dc1| and |dc2| can send to and receive from each other.
void TestDataChannelSendAndReceive(DataChannelInterface* dc1,
DataChannelInterface* dc2) {
DataChannelInterface* dc2,
size_t size = 6) {
std::unique_ptr<webrtc::MockDataChannelObserver> dc1_observer(
new webrtc::MockDataChannelObserver(dc1));
std::unique_ptr<webrtc::MockDataChannelObserver> dc2_observer(
new webrtc::MockDataChannelObserver(dc2));
static const std::string kDummyData = "abcdefg";
webrtc::DataBuffer buffer(kDummyData);
static const std::string kDummyData =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
webrtc::DataBuffer buffer("");
size_t sizeLeft = size;
while (sizeLeft > 0) {
size_t chunkSize =
sizeLeft > kDummyData.length() ? kDummyData.length() : sizeLeft;
buffer.data.AppendData(kDummyData.data(), chunkSize);
sizeLeft -= chunkSize;
}
EXPECT_TRUE(dc1->Send(buffer));
EXPECT_EQ_WAIT(kDummyData, dc2_observer->last_message(), kMaxWait);
EXPECT_EQ_WAIT(buffer.data,
rtc::CopyOnWriteBuffer(dc2_observer->last_message()),
kMaxWait);
EXPECT_TRUE(dc2->Send(buffer));
EXPECT_EQ_WAIT(kDummyData, dc1_observer->last_message(), kMaxWait);
EXPECT_EQ_WAIT(buffer.data,
rtc::CopyOnWriteBuffer(dc1_observer->last_message()),
kMaxWait);
EXPECT_EQ(1U, dc1_observer->received_message_count());
EXPECT_EQ(size, dc1_observer->last_message().length());
EXPECT_EQ(1U, dc2_observer->received_message_count());
EXPECT_EQ(size, dc2_observer->last_message().length());
}
void WaitForDataChannelsToOpen(DataChannelInterface* local_dc,
@ -524,6 +541,40 @@ TEST_P(PeerConnectionEndToEndTest, CreateDataChannelAfterNegotiate) {
CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
}
// Verifies that a DataChannel created can transfer large messages.
TEST_P(PeerConnectionEndToEndTest, CreateDataChannelLargeTransfer) {
CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),
webrtc::MockAudioDecoderFactory::CreateEmptyFactory());
webrtc::DataChannelInit init;
// This DataChannel is for creating the data content in the negotiation.
rtc::scoped_refptr<DataChannelInterface> dummy(
caller_->CreateDataChannel("data", init));
Negotiate();
WaitForConnection();
// Wait for the data channel created pre-negotiation to be opened.
WaitForDataChannelsToOpen(dummy, callee_signaled_data_channels_, 0);
// Create new DataChannels after the negotiation and verify their states.
rtc::scoped_refptr<DataChannelInterface> caller_dc(
caller_->CreateDataChannel("hello", init));
rtc::scoped_refptr<DataChannelInterface> callee_dc(
callee_->CreateDataChannel("hello", init));
WaitForDataChannelsToOpen(caller_dc, callee_signaled_data_channels_, 1);
WaitForDataChannelsToOpen(callee_dc, caller_signaled_data_channels_, 0);
TestDataChannelSendAndReceive(caller_dc, callee_signaled_data_channels_[1],
256 * 1024);
TestDataChannelSendAndReceive(callee_dc, caller_signaled_data_channels_[0],
256 * 1024);
CloseDataChannels(caller_dc, callee_signaled_data_channels_, 1);
CloseDataChannels(callee_dc, caller_signaled_data_channels_, 0);
}
// Verifies that DataChannel IDs are even/odd based on the DTLS roles.
TEST_P(PeerConnectionEndToEndTest, DataChannelIdAssignment) {
CreatePcs(nullptr, webrtc::CreateBuiltinAudioEncoderFactory(),

View File

@ -22,6 +22,9 @@ CopyOnWriteBuffer::CopyOnWriteBuffer(const CopyOnWriteBuffer& buf)
CopyOnWriteBuffer::CopyOnWriteBuffer(CopyOnWriteBuffer&& buf)
: buffer_(std::move(buf.buffer_)) {}
CopyOnWriteBuffer::CopyOnWriteBuffer(const std::string& s)
: CopyOnWriteBuffer(s.data(), s.length()) {}
CopyOnWriteBuffer::CopyOnWriteBuffer(size_t size)
: buffer_(size > 0 ? new RefCountedObject<Buffer>(size) : nullptr) {
RTC_DCHECK(IsConsistent());

View File

@ -31,6 +31,9 @@ class CopyOnWriteBuffer {
// Move contents from an existing buffer.
CopyOnWriteBuffer(CopyOnWriteBuffer&& buf);
// Construct a buffer from a string, convenient for unittests.
CopyOnWriteBuffer(const std::string& s);
// Construct a buffer with the specified number of uninitialized bytes.
explicit CopyOnWriteBuffer(size_t size);
CopyOnWriteBuffer(size_t size, size_t capacity);