Update DataChannel bufferedamount implementation.

Call DataChannelObserver::OnBufferedAmountChange on each successful send.
Previously, the observer would get notified of buffered amount changes only when
queued send data is consumed. Data gets queued only if it cannot be sent right
away. According to the WebRTC standard[1], bufferedamount should be increased
before each sent and decreased after each successful sent. Update implementation
to be standard compliant.

Design doc: http://doc/1lorHBn-GMn5U0T0RQANxrsW0pXhw8XGZM-xZyVUOW90

[1] https://w3c.github.io/webrtc-pc/#dom-datachannel-bufferedamount

Bug: chromium:878682
Change-Id: Ife009d30c4a18dced9a54cf600a445bb1f02561d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/123237
Commit-Queue: Marina Ciocea <marinaciocea@webrtc.org>
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Karl Wiberg <kwiberg@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#27057}
This commit is contained in:
Marina Ciocea
2019-03-04 15:52:21 +01:00
committed by Commit Bot
parent ad89528051
commit e448a3fb54
4 changed files with 31 additions and 21 deletions

View File

@ -87,7 +87,7 @@ class DataChannelObserver {
// A data buffer was successfully received. // A data buffer was successfully received.
virtual void OnMessage(const DataBuffer& buffer) = 0; virtual void OnMessage(const DataBuffer& buffer) = 0;
// The data channel's buffered_amount has changed. // The data channel's buffered_amount has changed.
virtual void OnBufferedAmountChange(uint64_t previous_amount) {} virtual void OnBufferedAmountChange(uint64_t sent_data_size) {}
protected: protected:
virtual ~DataChannelObserver() = default; virtual ~DataChannelObserver() = default;

View File

@ -127,6 +127,7 @@ DataChannel::DataChannel(DataChannelProviderInterface* provider,
bytes_sent_(0), bytes_sent_(0),
messages_received_(0), messages_received_(0),
bytes_received_(0), bytes_received_(0),
buffered_amount_(0),
data_channel_type_(dct), data_channel_type_(dct),
provider_(provider), provider_(provider),
handshake_state_(kHandshakeInit), handshake_state_(kHandshakeInit),
@ -210,7 +211,7 @@ bool DataChannel::reliable() const {
} }
uint64_t DataChannel::buffered_amount() const { uint64_t DataChannel::buffered_amount() const {
return queued_send_data_.byte_count(); return buffered_amount_;
} }
void DataChannel::Close() { void DataChannel::Close() {
@ -224,6 +225,7 @@ void DataChannel::Close() {
} }
bool DataChannel::Send(const DataBuffer& buffer) { bool DataChannel::Send(const DataBuffer& buffer) {
buffered_amount_ += buffer.size();
if (state_ != kOpen) { if (state_ != kOpen) {
return false; return false;
} }
@ -429,6 +431,7 @@ void DataChannel::CloseAbruptly() {
// Closing abruptly means any queued data gets thrown away. // Closing abruptly means any queued data gets thrown away.
queued_send_data_.Clear(); queued_send_data_.Clear();
buffered_amount_ = 0;
queued_control_data_.Clear(); queued_control_data_.Clear();
// Still go to "kClosing" before "kClosed", since observers may be expecting // Still go to "kClosing" before "kClosed", since observers may be expecting
@ -548,7 +551,6 @@ void DataChannel::SendQueuedDataMessages() {
RTC_DCHECK(state_ == kOpen || state_ == kClosing); RTC_DCHECK(state_ == kOpen || state_ == kClosing);
uint64_t start_buffered_amount = buffered_amount();
while (!queued_send_data_.Empty()) { while (!queued_send_data_.Empty()) {
std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront(); std::unique_ptr<DataBuffer> buffer = queued_send_data_.PopFront();
if (!SendDataMessage(*buffer, false)) { if (!SendDataMessage(*buffer, false)) {
@ -557,10 +559,6 @@ void DataChannel::SendQueuedDataMessages() {
break; break;
} }
} }
if (observer_ && buffered_amount() < start_buffered_amount) {
observer_->OnBufferedAmountChange(start_buffered_amount);
}
} }
bool DataChannel::SendDataMessage(const DataBuffer& buffer, bool DataChannel::SendDataMessage(const DataBuffer& buffer,
@ -591,6 +589,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer,
if (success) { if (success) {
++messages_sent_; ++messages_sent_;
bytes_sent_ += buffer.size(); bytes_sent_ += buffer.size();
RTC_DCHECK(buffered_amount_ >= buffer.size());
buffered_amount_ -= buffer.size();
if (observer_ && buffer.size() > 0) {
observer_->OnBufferedAmountChange(buffer.size());
}
return true; return true;
} }
@ -614,17 +618,12 @@ bool DataChannel::SendDataMessage(const DataBuffer& buffer,
} }
bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) { bool DataChannel::QueueSendDataMessage(const DataBuffer& buffer) {
size_t start_buffered_amount = buffered_amount(); size_t start_buffered_amount = queued_send_data_.byte_count();
if (start_buffered_amount >= kMaxQueuedSendDataBytes) { if (start_buffered_amount + buffer.size() > kMaxQueuedSendDataBytes) {
RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel."; RTC_LOG(LS_ERROR) << "Can't buffer any more data for the data channel.";
return false; return false;
} }
queued_send_data_.PushBack(absl::make_unique<DataBuffer>(buffer)); queued_send_data_.PushBack(absl::make_unique<DataBuffer>(buffer));
// The buffer can have length zero, in which case there is no change.
if (observer_ && buffered_amount() > start_buffered_amount) {
observer_->OnBufferedAmountChange(start_buffered_amount);
}
return true; return true;
} }

View File

@ -269,6 +269,9 @@ class DataChannel : public DataChannelInterface, public sigslot::has_slots<> {
uint64_t bytes_sent_; uint64_t bytes_sent_;
uint32_t messages_received_; uint32_t messages_received_;
uint64_t bytes_received_; uint64_t bytes_received_;
// Number of bytes of data that have been queued using Send(). Increased
// before each transport send and decreased after each successful send.
uint64_t buffered_amount_;
cricket::DataChannelType data_channel_type_; cricket::DataChannelType data_channel_type_;
DataChannelProviderInterface* provider_; DataChannelProviderInterface* provider_;
HandshakeState handshake_state_; HandshakeState handshake_state_;

View File

@ -163,9 +163,11 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
SetChannelReady(); SetChannelReady();
webrtc::DataBuffer buffer("abcd"); webrtc::DataBuffer buffer("abcd");
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
size_t successful_send_count = 1;
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(0U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
provider_->set_send_blocked(true); provider_->set_send_blocked(true);
@ -175,7 +177,13 @@ TEST_F(SctpDataChannelTest, BufferedAmountWhenBlocked) {
} }
EXPECT_EQ(buffer.data.size() * number_of_packets, EXPECT_EQ(buffer.data.size() * number_of_packets,
webrtc_data_channel_->buffered_amount()); webrtc_data_channel_->buffered_amount());
EXPECT_EQ(rtc::checked_cast<size_t>(number_of_packets), EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count());
provider_->set_send_blocked(false);
successful_send_count += number_of_packets;
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(successful_send_count,
observer_->on_buffered_amount_change_count()); observer_->on_buffered_amount_change_count());
} }
@ -188,12 +196,12 @@ TEST_F(SctpDataChannelTest, QueuedDataSentWhenUnblocked) {
provider_->set_send_blocked(true); provider_->set_send_blocked(true);
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
provider_->set_send_blocked(false); provider_->set_send_blocked(false);
SetChannelReady(); SetChannelReady();
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
} }
// Tests that no crash when the channel is blocked right away while trying to // Tests that no crash when the channel is blocked right away while trying to
@ -204,18 +212,18 @@ TEST_F(SctpDataChannelTest, BlockedWhenSendQueuedDataNoCrash) {
webrtc::DataBuffer buffer("abcd"); webrtc::DataBuffer buffer("abcd");
provider_->set_send_blocked(true); provider_->set_send_blocked(true);
EXPECT_TRUE(webrtc_data_channel_->Send(buffer)); EXPECT_TRUE(webrtc_data_channel_->Send(buffer));
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
// Set channel ready while it is still blocked. // Set channel ready while it is still blocked.
SetChannelReady(); SetChannelReady();
EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount()); EXPECT_EQ(buffer.size(), webrtc_data_channel_->buffered_amount());
EXPECT_EQ(1U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(0U, observer_->on_buffered_amount_change_count());
// Unblock the channel to send queued data again, there should be no crash. // Unblock the channel to send queued data again, there should be no crash.
provider_->set_send_blocked(false); provider_->set_send_blocked(false);
SetChannelReady(); SetChannelReady();
EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount()); EXPECT_EQ(0U, webrtc_data_channel_->buffered_amount());
EXPECT_EQ(2U, observer_->on_buffered_amount_change_count()); EXPECT_EQ(1U, observer_->on_buffered_amount_change_count());
} }
// Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are // Tests that DataChannel::messages_sent() and DataChannel::bytes_sent() are