From e448a3fb54eec298d9113b2bd85e71e2fe69bb77 Mon Sep 17 00:00:00 2001 From: Marina Ciocea Date: Mon, 4 Mar 2019 15:52:21 +0100 Subject: [PATCH] Update DataChannel bufferedamount implementation. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Call DataChannelObserver::OnBufferedAmountChange on each successful send. Previously, the observer would get notified of buffered amount changes only when queued send data is consumed. Data gets queued only if it cannot be sent right away. According to the WebRTC standard[1], bufferedamount should be increased before each sent and decreased after each successful sent. Update implementation to be standard compliant. Design doc: http://doc/1lorHBn-GMn5U0T0RQANxrsW0pXhw8XGZM-xZyVUOW90 [1] https://w3c.github.io/webrtc-pc/#dom-datachannel-bufferedamount Bug: chromium:878682 Change-Id: Ife009d30c4a18dced9a54cf600a445bb1f02561d Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/123237 Commit-Queue: Marina Ciocea Reviewed-by: Harald Alvestrand Reviewed-by: Karl Wiberg Reviewed-by: Henrik Boström Cr-Commit-Position: refs/heads/master@{#27057} --- api/data_channel_interface.h | 2 +- pc/data_channel.cc | 25 ++++++++++++------------- pc/data_channel.h | 3 +++ pc/data_channel_unittest.cc | 22 +++++++++++++++------- 4 files changed, 31 insertions(+), 21 deletions(-) diff --git a/api/data_channel_interface.h b/api/data_channel_interface.h index 91a9804003..1bd874e936 100644 --- a/api/data_channel_interface.h +++ b/api/data_channel_interface.h @@ -87,7 +87,7 @@ class DataChannelObserver { // A data buffer was successfully received. virtual void OnMessage(const DataBuffer& buffer) = 0; // The data channel's buffered_amount has changed. - virtual void OnBufferedAmountChange(uint64_t previous_amount) {} + virtual void OnBufferedAmountChange(uint64_t sent_data_size) {} protected: virtual ~DataChannelObserver() = default; diff --git a/pc/data_channel.cc b/pc/data_channel.cc index f854defd1b..e4727f25de 100644 --- a/pc/data_channel.cc +++ b/pc/data_channel.cc @@ -127,6 +127,7 @@ DataChannel::DataChannel(DataChannelProviderInterface* provider, bytes_sent_(0), messages_received_(0), bytes_received_(0), + buffered_amount_(0), data_channel_type_(dct), provider_(provider), handshake_state_(kHandshakeInit), @@ -210,7 +211,7 @@ bool DataChannel::reliable() const { } uint64_t DataChannel::buffered_amount() const { - return queued_send_data_.byte_count(); + return buffered_amount_; } void DataChannel::Close() { @@ -224,6 +225,7 @@ void DataChannel::Close() { } bool DataChannel::Send(const DataBuffer& buffer) { + buffered_amount_ += buffer.size(); if (state_ != kOpen) { return false; } @@ -429,6 +431,7 @@ void DataChannel::CloseAbruptly() { // Closing abruptly means any queued data gets thrown away. queued_send_data_.Clear(); + buffered_amount_ = 0; queued_control_data_.Clear(); // Still go to "kClosing" before "kClosed", since observers may be expecting @@ -548,7 +551,6 @@ void DataChannel::SendQueuedDataMessages() { RTC_DCHECK(state_ == kOpen || state_ == kClosing); - uint64_t start_buffered_amount = buffered_amount(); while (!queued_send_data_.Empty()) { std::unique_ptr buffer = queued_send_data_.PopFront(); if (!SendDataMessage(*buffer, false)) { @@ -557,10 +559,6 @@ void DataChannel::SendQueuedDataMessages() { break; } } - - if (observer_ && buffered_amount() < start_buffered_amount) { - observer_->OnBufferedAmountChange(start_buffered_amount); - } } bool DataChannel::SendDataMessage(const DataBuffer& buffer, @@ -591,6 +589,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, if (success) { ++messages_sent_; bytes_sent_ += buffer.size(); + + RTC_DCHECK(buffered_amount_ >= buffer.size()); + buffered_amount_ -= buffer.size(); + if (observer_ && buffer.size() > 0) { + observer_->OnBufferedAmountChange(buffer.size()); + } return true; } @@ -614,17 +618,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer, } bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { - size_t start_buffered_amount = buffered_amount(); - if (start_buffered_amount >= kMaxQueuedSendDataBytes) { + size_t start_buffered_amount = queued_send_data_.byte_count(); + if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) { RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; return false; } queued_send_data_.PushBack(absl::make_unique(buffer)); - - // The buffer can have length zero, in which case there is no change. - if (observer_ && buffered_amount() > start_buffered_amount) { - observer_->OnBufferedAmountChange(start_buffered_amount); - } return true; } diff --git a/pc/data_channel.h b/pc/data_channel.h index a07c4fb57b..fa5c0f5f43 100644 --- a/pc/data_channel.h +++ b/pc/data_channel.h @@ -269,6 +269,9 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> { uint64_t bytes_sent_; uint32_t messages_received_; uint64_t bytes_received_; + // Number of bytes of data that have been queued using Send(). Increased + // before each transport send and decreased after each successful send. + uint64_t buffered_amount_; cricket::DataChannelType data_channel_type_; DataChannelProviderInterface* provider_; HandshakeState handshake_state_; diff --git a/pc/data_channel_unittest.cc b/pc/data_channel_unittest.cc index 4aa9dee787..7ce40fb088 100644 --- a/pc/data_channel_unittest.cc +++ b/pc/data_channel_unittest.cc @@ -163,9 +163,11 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { SetChannelReady(); webrtc::DataBuffer buffer("abcd"); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); + size_t successful_send_count = 1; EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); provider_->set_send_blocked(true); @@ -175,7 +177,13 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) { } EXPECT_EQ(buffer.data.size() * number_of_packets, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(rtc::checked_cast(number_of_packets), + EXPECT_EQ(successful_send_count, + observer_->on_buffered_amount_change_count()); + + provider_->set_send_blocked(false); + successful_send_count += number_of_packets; + EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); + EXPECT_EQ(successful_send_count, observer_->on_buffered_amount_change_count()); } @@ -188,12 +196,12 @@ TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) { provider_->set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); provider_->set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); } // Tests that no crash when the channel is blocked right away while trying to @@ -204,18 +212,18 @@ TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) { webrtc::DataBuffer buffer("abcd"); provider_->set_send_blocked(true); EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); // Set channel ready while it is still blocked. SetChannelReady(); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); // Unblock the channel to send queued data again, there should be no crash. provider_->set_send_blocked(false); SetChannelReady(); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); - EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); + EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); } // Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are