Revert "dcsctp: Use stream scheduler in send queue"
This reverts commit d729d12454906d924d5a142deb3432e2d5fa97ae. Reason for revert: Breaks downstream project. Original change's description: > dcsctp: Use stream scheduler in send queue > > Changing the currently embedded scheduler that was implemented using a > revolving pointer, to the parameterized stream scheduler that is > implemented using a "virtual finish time" approach. > > Also renamed StreamCallback to StreamProducer, per review comments. > > Bug: webrtc:5696 > Change-Id: I7719678776ddbe05b688ada1b52887e5ca2fb206 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/262160 > Reviewed-by: Harald Alvestrand <hta@webrtc.org> > Commit-Queue: Victor Boivie <boivie@webrtc.org> > Cr-Commit-Position: refs/heads/main@{#37170} Bug: webrtc:5696 Change-Id: Iaf3608b52a31eb31b4ca604539edb2e8ca89399b No-Presubmit: true No-Tree-Checks: true No-Try: true Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/265389 Auto-Submit: Victor Boivie <boivie@webrtc.org> Commit-Queue: Tomas Gunnarsson <tommi@webrtc.org> Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com> Cr-Commit-Position: refs/heads/main@{#37172}
This commit is contained in:

committed by
WebRTC LUCI CQ

parent
6fd77f3d3f
commit
5df960d307
@ -14,7 +14,6 @@ rtc_source_set("send_queue") {
|
|||||||
"../common:internal_types",
|
"../common:internal_types",
|
||||||
"../packet:chunk",
|
"../packet:chunk",
|
||||||
"../packet:data",
|
"../packet:data",
|
||||||
"../public:socket",
|
|
||||||
"../public:types",
|
"../public:types",
|
||||||
]
|
]
|
||||||
sources = [ "send_queue.h" ]
|
sources = [ "send_queue.h" ]
|
||||||
@ -24,12 +23,9 @@ rtc_source_set("send_queue") {
|
|||||||
rtc_library("rr_send_queue") {
|
rtc_library("rr_send_queue") {
|
||||||
deps = [
|
deps = [
|
||||||
":send_queue",
|
":send_queue",
|
||||||
":stream_scheduler",
|
|
||||||
"../../../api:array_view",
|
"../../../api:array_view",
|
||||||
"../../../rtc_base:checks",
|
"../../../rtc_base:checks",
|
||||||
"../../../rtc_base:logging",
|
"../../../rtc_base:logging",
|
||||||
"../../../rtc_base/containers:flat_map",
|
|
||||||
"../common:str_join",
|
|
||||||
"../packet:data",
|
"../packet:data",
|
||||||
"../public:socket",
|
"../public:socket",
|
||||||
"../public:types",
|
"../public:types",
|
||||||
@ -184,7 +180,6 @@ if (rtc_include_tests) {
|
|||||||
"../common:sequence_numbers",
|
"../common:sequence_numbers",
|
||||||
"../packet:chunk",
|
"../packet:chunk",
|
||||||
"../packet:data",
|
"../packet:data",
|
||||||
"../packet:sctp_packet",
|
|
||||||
"../public:socket",
|
"../public:socket",
|
||||||
"../public:types",
|
"../public:types",
|
||||||
"../testing:data_generator",
|
"../testing:data_generator",
|
||||||
|
@ -13,14 +13,12 @@
|
|||||||
#include <deque>
|
#include <deque>
|
||||||
#include <limits>
|
#include <limits>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <set>
|
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
#include "absl/algorithm/container.h"
|
#include "absl/algorithm/container.h"
|
||||||
#include "absl/types/optional.h"
|
#include "absl/types/optional.h"
|
||||||
#include "api/array_view.h"
|
#include "api/array_view.h"
|
||||||
#include "net/dcsctp/common/str_join.h"
|
|
||||||
#include "net/dcsctp/packet/data.h"
|
#include "net/dcsctp/packet/data.h"
|
||||||
#include "net/dcsctp/public/dcsctp_message.h"
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||||
@ -44,18 +42,18 @@ RRSendQueue::RRSendQueue(absl::string_view log_prefix,
|
|||||||
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
|
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t RRSendQueue::OutgoingStream::bytes_to_send_in_next_message() const {
|
bool RRSendQueue::OutgoingStream::HasDataToSend() const {
|
||||||
if (pause_state_ == PauseState::kPaused ||
|
if (pause_state_ == PauseState::kPaused ||
|
||||||
pause_state_ == PauseState::kResetting) {
|
pause_state_ == PauseState::kResetting) {
|
||||||
// The stream has paused (and there is no partially sent message).
|
// The stream has paused (and there is no partially sent message).
|
||||||
return 0;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (items_.empty()) {
|
if (items_.empty()) {
|
||||||
return 0;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return items_.front().remaining_size;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RRSendQueue::OutgoingStream::AddHandoverState(
|
void RRSendQueue::OutgoingStream::AddHandoverState(
|
||||||
@ -63,30 +61,29 @@ void RRSendQueue::OutgoingStream::AddHandoverState(
|
|||||||
state.next_ssn = next_ssn_.value();
|
state.next_ssn = next_ssn_.value();
|
||||||
state.next_ordered_mid = next_ordered_mid_.value();
|
state.next_ordered_mid = next_ordered_mid_.value();
|
||||||
state.next_unordered_mid = next_unordered_mid_.value();
|
state.next_unordered_mid = next_unordered_mid_.value();
|
||||||
state.priority = *scheduler_stream_->priority();
|
state.priority = *priority_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RRSendQueue::IsConsistent() const {
|
bool RRSendQueue::IsConsistent() const {
|
||||||
std::set<StreamID> expected_active_streams;
|
|
||||||
std::set<StreamID> actual_active_streams;
|
|
||||||
|
|
||||||
size_t total_buffered_amount = 0;
|
size_t total_buffered_amount = 0;
|
||||||
for (const auto& [stream_id, stream] : streams_) {
|
for (const auto& [unused, stream] : streams_) {
|
||||||
total_buffered_amount += stream.buffered_amount().value();
|
total_buffered_amount += stream.buffered_amount().value();
|
||||||
if (stream.bytes_to_send_in_next_message() > 0) {
|
}
|
||||||
expected_active_streams.emplace(stream_id);
|
|
||||||
|
if (previous_message_has_ended_) {
|
||||||
|
auto it = streams_.find(current_stream_id_);
|
||||||
|
if (it != streams_.end() && it->second.has_partially_sent_message()) {
|
||||||
|
RTC_DLOG(LS_ERROR)
|
||||||
|
<< "Previous message has ended, but still partial message in stream";
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
auto it = streams_.find(current_stream_id_);
|
||||||
|
if (it == streams_.end() || !it->second.has_partially_sent_message()) {
|
||||||
|
RTC_DLOG(LS_ERROR)
|
||||||
|
<< "Previous message has NOT ended, but there is no partial message";
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
for (const auto& stream : scheduler_.ActiveStreamsForTesting()) {
|
|
||||||
actual_active_streams.emplace(stream->stream_id());
|
|
||||||
}
|
|
||||||
if (expected_active_streams != actual_active_streams) {
|
|
||||||
auto fn = [&](rtc::StringBuilder& sb, const auto& p) { sb << *p; };
|
|
||||||
RTC_DLOG(LS_ERROR) << "Active streams mismatch, is=["
|
|
||||||
<< StrJoin(actual_active_streams, ",", fn)
|
|
||||||
<< "], expected=["
|
|
||||||
<< StrJoin(expected_active_streams, ",", fn) << "]";
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return total_buffered_amount == total_buffered_amount_.value();
|
return total_buffered_amount == total_buffered_amount_.value();
|
||||||
@ -121,15 +118,10 @@ void RRSendQueue::ThresholdWatcher::SetLowThreshold(size_t low_threshold) {
|
|||||||
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
void RRSendQueue::OutgoingStream::Add(DcSctpMessage message,
|
||||||
TimeMs expires_at,
|
TimeMs expires_at,
|
||||||
const SendOptions& send_options) {
|
const SendOptions& send_options) {
|
||||||
bool was_active = bytes_to_send_in_next_message() > 0;
|
|
||||||
buffered_amount_.Increase(message.payload().size());
|
buffered_amount_.Increase(message.payload().size());
|
||||||
total_buffered_amount_.Increase(message.payload().size());
|
total_buffered_amount_.Increase(message.payload().size());
|
||||||
items_.emplace_back(std::move(message), expires_at, send_options);
|
items_.emplace_back(std::move(message), expires_at, send_options);
|
||||||
|
|
||||||
if (!was_active) {
|
|
||||||
scheduler_stream_->MaybeMakeActive();
|
|
||||||
}
|
|
||||||
|
|
||||||
RTC_DCHECK(IsConsistent());
|
RTC_DCHECK(IsConsistent());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -235,15 +227,8 @@ bool RRSendQueue::OutgoingStream::Discard(IsUnordered unordered,
|
|||||||
total_buffered_amount_.Decrease(item.remaining_size);
|
total_buffered_amount_.Decrease(item.remaining_size);
|
||||||
items_.pop_front();
|
items_.pop_front();
|
||||||
|
|
||||||
// Only partially sent messages are discarded, so if a message was
|
|
||||||
// discarded, then it was the currently sent message.
|
|
||||||
scheduler_stream_->ForceReschedule();
|
|
||||||
|
|
||||||
if (pause_state_ == PauseState::kPending) {
|
if (pause_state_ == PauseState::kPending) {
|
||||||
pause_state_ = PauseState::kPaused;
|
pause_state_ = PauseState::kPaused;
|
||||||
scheduler_stream_->MakeInactive();
|
|
||||||
} else if (bytes_to_send_in_next_message() == 0) {
|
|
||||||
scheduler_stream_->MakeInactive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// As the item still existed, it had unsent data.
|
// As the item still existed, it had unsent data.
|
||||||
@ -292,7 +277,6 @@ void RRSendQueue::OutgoingStream::Pause() {
|
|||||||
if (had_pending_items && pause_state_ == PauseState::kPaused) {
|
if (had_pending_items && pause_state_ == PauseState::kPaused) {
|
||||||
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
|
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
|
||||||
<< " was previously active, but is now paused.";
|
<< " was previously active, but is now paused.";
|
||||||
scheduler_stream_->MakeInactive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
RTC_DCHECK(IsConsistent());
|
RTC_DCHECK(IsConsistent());
|
||||||
@ -300,8 +284,11 @@ void RRSendQueue::OutgoingStream::Pause() {
|
|||||||
|
|
||||||
void RRSendQueue::OutgoingStream::Resume() {
|
void RRSendQueue::OutgoingStream::Resume() {
|
||||||
RTC_DCHECK(pause_state_ == PauseState::kResetting);
|
RTC_DCHECK(pause_state_ == PauseState::kResetting);
|
||||||
|
if (!items_.empty()) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << "Stream " << *stream_id()
|
||||||
|
<< " was previously paused, but is now active.";
|
||||||
|
}
|
||||||
pause_state_ = PauseState::kNotPaused;
|
pause_state_ = PauseState::kNotPaused;
|
||||||
scheduler_stream_->MaybeMakeActive();
|
|
||||||
RTC_DCHECK(IsConsistent());
|
RTC_DCHECK(IsConsistent());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -309,11 +296,6 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||||||
// This can be called both when an outgoing stream reset has been responded
|
// This can be called both when an outgoing stream reset has been responded
|
||||||
// to, or when the entire SendQueue is reset due to detecting the peer having
|
// to, or when the entire SendQueue is reset due to detecting the peer having
|
||||||
// restarted. The stream may be in any state at this time.
|
// restarted. The stream may be in any state at this time.
|
||||||
PauseState old_pause_state = pause_state_;
|
|
||||||
pause_state_ = PauseState::kNotPaused;
|
|
||||||
next_ordered_mid_ = MID(0);
|
|
||||||
next_unordered_mid_ = MID(0);
|
|
||||||
next_ssn_ = SSN(0);
|
|
||||||
if (!items_.empty()) {
|
if (!items_.empty()) {
|
||||||
// If this message has been partially sent, reset it so that it will be
|
// If this message has been partially sent, reset it so that it will be
|
||||||
// re-sent.
|
// re-sent.
|
||||||
@ -327,11 +309,11 @@ void RRSendQueue::OutgoingStream::Reset() {
|
|||||||
item.message_id = absl::nullopt;
|
item.message_id = absl::nullopt;
|
||||||
item.ssn = absl::nullopt;
|
item.ssn = absl::nullopt;
|
||||||
item.current_fsn = FSN(0);
|
item.current_fsn = FSN(0);
|
||||||
if (old_pause_state == PauseState::kPaused ||
|
|
||||||
old_pause_state == PauseState::kResetting) {
|
|
||||||
scheduler_stream_->MaybeMakeActive();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
pause_state_ = PauseState::kNotPaused;
|
||||||
|
next_ordered_mid_ = MID(0);
|
||||||
|
next_unordered_mid_ = MID(0);
|
||||||
|
next_ssn_ = SSN(0);
|
||||||
RTC_DCHECK(IsConsistent());
|
RTC_DCHECK(IsConsistent());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -368,9 +350,67 @@ bool RRSendQueue::IsEmpty() const {
|
|||||||
return total_buffered_amount() == 0;
|
return total_buffered_amount() == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator
|
||||||
|
RRSendQueue::GetNextStream() {
|
||||||
|
auto start_it = streams_.lower_bound(StreamID(*current_stream_id_ + 1));
|
||||||
|
|
||||||
|
for (auto it = start_it; it != streams_.end(); ++it) {
|
||||||
|
if (it->second.HasDataToSend()) {
|
||||||
|
current_stream_id_ = it->first;
|
||||||
|
return it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (auto it = streams_.begin(); it != start_it; ++it) {
|
||||||
|
if (it->second.HasDataToSend()) {
|
||||||
|
current_stream_id_ = it->first;
|
||||||
|
return it;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return streams_.end();
|
||||||
|
}
|
||||||
|
|
||||||
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
absl::optional<SendQueue::DataToSend> RRSendQueue::Produce(TimeMs now,
|
||||||
size_t max_size) {
|
size_t max_size) {
|
||||||
return scheduler_.Produce(now, max_size);
|
std::map<StreamID, RRSendQueue::OutgoingStream>::iterator stream_it;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (previous_message_has_ended_) {
|
||||||
|
// Previous message has ended. Round-robin to a different stream, if there
|
||||||
|
// even is one with data to send.
|
||||||
|
stream_it = GetNextStream();
|
||||||
|
if (stream_it == streams_.end()) {
|
||||||
|
RTC_DLOG(LS_VERBOSE)
|
||||||
|
<< log_prefix_
|
||||||
|
<< "There is no stream with data; Can't produce any data.";
|
||||||
|
return absl::nullopt;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// The previous message has not ended; Continue from the current stream.
|
||||||
|
stream_it = streams_.find(current_stream_id_);
|
||||||
|
RTC_DCHECK(stream_it != streams_.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
absl::optional<DataToSend> data = stream_it->second.Produce(now, max_size);
|
||||||
|
if (!data.has_value()) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "Producing DATA, type="
|
||||||
|
<< (data->data.is_unordered ? "unordered" : "ordered")
|
||||||
|
<< "::"
|
||||||
|
<< (*data->data.is_beginning && *data->data.is_end
|
||||||
|
? "complete"
|
||||||
|
: *data->data.is_beginning ? "first"
|
||||||
|
: *data->data.is_end ? "last"
|
||||||
|
: "middle")
|
||||||
|
<< ", stream_id=" << *stream_it->first
|
||||||
|
<< ", ppid=" << *data->data.ppid
|
||||||
|
<< ", length=" << data->data.payload.size();
|
||||||
|
|
||||||
|
previous_message_has_ended_ = *data->data.is_end;
|
||||||
|
RTC_DCHECK(IsConsistent());
|
||||||
|
return data;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RRSendQueue::Discard(IsUnordered unordered,
|
bool RRSendQueue::Discard(IsUnordered unordered,
|
||||||
@ -378,8 +418,12 @@ bool RRSendQueue::Discard(IsUnordered unordered,
|
|||||||
MID message_id) {
|
MID message_id) {
|
||||||
bool has_discarded =
|
bool has_discarded =
|
||||||
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
GetOrCreateStreamInfo(stream_id).Discard(unordered, message_id);
|
||||||
|
if (has_discarded) {
|
||||||
|
// Only partially sent messages are discarded, so if a message was
|
||||||
|
// discarded, then it was the currently sent message.
|
||||||
|
previous_message_has_ended_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
RTC_DCHECK(IsConsistent());
|
|
||||||
return has_discarded;
|
return has_discarded;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -440,7 +484,7 @@ void RRSendQueue::Reset() {
|
|||||||
for (auto& [unused, stream] : streams_) {
|
for (auto& [unused, stream] : streams_) {
|
||||||
stream.Reset();
|
stream.Reset();
|
||||||
}
|
}
|
||||||
scheduler_.ForceReschedule();
|
previous_message_has_ended_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
size_t RRSendQueue::buffered_amount(StreamID stream_id) const {
|
||||||
@ -472,9 +516,9 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
|
|||||||
}
|
}
|
||||||
|
|
||||||
return streams_
|
return streams_
|
||||||
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
|
.emplace(stream_id,
|
||||||
std::forward_as_tuple(
|
OutgoingStream(
|
||||||
&scheduler_, stream_id, default_priority_,
|
stream_id, default_priority_,
|
||||||
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
||||||
total_buffered_amount_))
|
total_buffered_amount_))
|
||||||
.first->second;
|
.first->second;
|
||||||
@ -518,9 +562,9 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
|
|||||||
state.tx.streams) {
|
state.tx.streams) {
|
||||||
StreamID stream_id(state_stream.id);
|
StreamID stream_id(state_stream.id);
|
||||||
streams_.emplace(
|
streams_.emplace(
|
||||||
std::piecewise_construct, std::forward_as_tuple(stream_id),
|
stream_id,
|
||||||
std::forward_as_tuple(
|
OutgoingStream(
|
||||||
&scheduler_, stream_id, StreamPriority(state_stream.priority),
|
stream_id, StreamPriority(state_stream.priority),
|
||||||
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
[this, stream_id]() { on_buffered_amount_low_(stream_id); },
|
||||||
total_buffered_amount_, &state_stream));
|
total_buffered_amount_, &state_stream));
|
||||||
}
|
}
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
#include <cstdint>
|
#include <cstdint>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <map>
|
#include <map>
|
||||||
#include <memory>
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
@ -26,7 +25,6 @@
|
|||||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||||
#include "net/dcsctp/public/types.h"
|
#include "net/dcsctp/public/types.h"
|
||||||
#include "net/dcsctp/tx/send_queue.h"
|
#include "net/dcsctp/tx/send_queue.h"
|
||||||
#include "net/dcsctp/tx/stream_scheduler.h"
|
|
||||||
|
|
||||||
namespace dcsctp {
|
namespace dcsctp {
|
||||||
|
|
||||||
@ -113,33 +111,32 @@ class RRSendQueue : public SendQueue {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Per-stream information.
|
// Per-stream information.
|
||||||
class OutgoingStream : public StreamScheduler::StreamProducer {
|
class OutgoingStream {
|
||||||
public:
|
public:
|
||||||
OutgoingStream(
|
OutgoingStream(
|
||||||
StreamScheduler* scheduler,
|
|
||||||
StreamID stream_id,
|
StreamID stream_id,
|
||||||
StreamPriority priority,
|
StreamPriority priority,
|
||||||
std::function<void()> on_buffered_amount_low,
|
std::function<void()> on_buffered_amount_low,
|
||||||
ThresholdWatcher& total_buffered_amount,
|
ThresholdWatcher& total_buffered_amount,
|
||||||
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
|
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
|
||||||
: scheduler_stream_(scheduler->CreateStream(this, stream_id, priority)),
|
: stream_id_(stream_id),
|
||||||
|
priority_(priority),
|
||||||
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
|
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
|
||||||
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
|
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
|
||||||
next_ssn_(SSN(state ? state->next_ssn : 0)),
|
next_ssn_(SSN(state ? state->next_ssn : 0)),
|
||||||
buffered_amount_(std::move(on_buffered_amount_low)),
|
buffered_amount_(std::move(on_buffered_amount_low)),
|
||||||
total_buffered_amount_(total_buffered_amount) {}
|
total_buffered_amount_(total_buffered_amount) {}
|
||||||
|
|
||||||
StreamID stream_id() const { return scheduler_stream_->stream_id(); }
|
StreamID stream_id() const { return stream_id_; }
|
||||||
|
|
||||||
// Enqueues a message to this stream.
|
// Enqueues a message to this stream.
|
||||||
void Add(DcSctpMessage message,
|
void Add(DcSctpMessage message,
|
||||||
TimeMs expires_at,
|
TimeMs expires_at,
|
||||||
const SendOptions& send_options);
|
const SendOptions& send_options);
|
||||||
|
|
||||||
// Implementing `StreamScheduler::StreamProducer`.
|
// Produces a data chunk to send, or `absl::nullopt` if nothing could be
|
||||||
absl::optional<SendQueue::DataToSend> Produce(TimeMs now,
|
// produced, e.g. if all messages have expired.
|
||||||
size_t max_size) override;
|
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size);
|
||||||
size_t bytes_to_send_in_next_message() const override;
|
|
||||||
|
|
||||||
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
|
const ThresholdWatcher& buffered_amount() const { return buffered_amount_; }
|
||||||
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
|
ThresholdWatcher& buffered_amount() { return buffered_amount_; }
|
||||||
@ -170,10 +167,12 @@ class RRSendQueue : public SendQueue {
|
|||||||
// Indicates if this stream has a partially sent message in it.
|
// Indicates if this stream has a partially sent message in it.
|
||||||
bool has_partially_sent_message() const;
|
bool has_partially_sent_message() const;
|
||||||
|
|
||||||
StreamPriority priority() const { return scheduler_stream_->priority(); }
|
// Indicates if the stream possibly has data to send. Note that it may
|
||||||
void set_priority(StreamPriority priority) {
|
// return `true` for streams that have enqueued, but expired, messages.
|
||||||
scheduler_stream_->set_priority(priority);
|
bool HasDataToSend() const;
|
||||||
}
|
|
||||||
|
void set_priority(StreamPriority priority) { priority_ = priority; }
|
||||||
|
StreamPriority priority() const { return priority_; }
|
||||||
|
|
||||||
void AddHandoverState(
|
void AddHandoverState(
|
||||||
DcSctpSocketHandoverState::OutgoingStream& state) const;
|
DcSctpSocketHandoverState::OutgoingStream& state) const;
|
||||||
@ -226,8 +225,8 @@ class RRSendQueue : public SendQueue {
|
|||||||
|
|
||||||
bool IsConsistent() const;
|
bool IsConsistent() const;
|
||||||
|
|
||||||
const std::unique_ptr<StreamScheduler::Stream> scheduler_stream_;
|
const StreamID stream_id_;
|
||||||
|
StreamPriority priority_;
|
||||||
PauseState pause_state_ = PauseState::kNotPaused;
|
PauseState pause_state_ = PauseState::kNotPaused;
|
||||||
// MIDs are different for unordered and ordered messages sent on a stream.
|
// MIDs are different for unordered and ordered messages sent on a stream.
|
||||||
MID next_unordered_mid_;
|
MID next_unordered_mid_;
|
||||||
@ -252,10 +251,12 @@ class RRSendQueue : public SendQueue {
|
|||||||
TimeMs now,
|
TimeMs now,
|
||||||
size_t max_size);
|
size_t max_size);
|
||||||
|
|
||||||
|
// Return the next stream, in round-robin fashion.
|
||||||
|
std::map<StreamID, OutgoingStream>::iterator GetNextStream();
|
||||||
|
|
||||||
const std::string log_prefix_;
|
const std::string log_prefix_;
|
||||||
const size_t buffer_size_;
|
const size_t buffer_size_;
|
||||||
const StreamPriority default_priority_;
|
const StreamPriority default_priority_;
|
||||||
StreamScheduler scheduler_;
|
|
||||||
|
|
||||||
// Called when the buffered amount is below what has been set using
|
// Called when the buffered amount is below what has been set using
|
||||||
// `SetBufferedAmountLowThreshold`.
|
// `SetBufferedAmountLowThreshold`.
|
||||||
@ -268,6 +269,15 @@ class RRSendQueue : public SendQueue {
|
|||||||
// The total amount of buffer data, for all streams.
|
// The total amount of buffer data, for all streams.
|
||||||
ThresholdWatcher total_buffered_amount_;
|
ThresholdWatcher total_buffered_amount_;
|
||||||
|
|
||||||
|
// Indicates if the previous fragment sent was the end of a message. For
|
||||||
|
// non-interleaved sending, this means that the next message may come from a
|
||||||
|
// different stream. If not true, the next fragment must be produced from the
|
||||||
|
// same stream as last time.
|
||||||
|
bool previous_message_has_ended_ = true;
|
||||||
|
|
||||||
|
// The current stream to send chunks from. Modified by `GetNextStream`.
|
||||||
|
StreamID current_stream_id_ = StreamID(0);
|
||||||
|
|
||||||
// All streams, and messages added to those.
|
// All streams, and messages added to those.
|
||||||
std::map<StreamID, OutgoingStream> streams_;
|
std::map<StreamID, OutgoingStream> streams_;
|
||||||
};
|
};
|
||||||
|
@ -114,7 +114,7 @@ StreamScheduler::VirtualTime StreamScheduler::Stream::GetNextFinishTime()
|
|||||||
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
|
absl::optional<SendQueue::DataToSend> StreamScheduler::Stream::Produce(
|
||||||
TimeMs now,
|
TimeMs now,
|
||||||
size_t max_size) {
|
size_t max_size) {
|
||||||
absl::optional<SendQueue::DataToSend> data = producer_.Produce(now, max_size);
|
absl::optional<SendQueue::DataToSend> data = callback_.Produce(now, max_size);
|
||||||
|
|
||||||
if (data.has_value()) {
|
if (data.has_value()) {
|
||||||
VirtualTime new_current = GetNextFinishTime();
|
VirtualTime new_current = GetNextFinishTime();
|
||||||
|
@ -56,9 +56,9 @@ class StreamScheduler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
class StreamProducer {
|
class StreamCallback {
|
||||||
public:
|
public:
|
||||||
virtual ~StreamProducer() = default;
|
virtual ~StreamCallback() = default;
|
||||||
|
|
||||||
// Produces a fragment of data to send. The current wall time is specified
|
// Produces a fragment of data to send. The current wall time is specified
|
||||||
// as `now` and should be used to skip chunks with expired limited lifetime.
|
// as `now` and should be used to skip chunks with expired limited lifetime.
|
||||||
@ -99,11 +99,11 @@ class StreamScheduler {
|
|||||||
friend class StreamScheduler;
|
friend class StreamScheduler;
|
||||||
|
|
||||||
Stream(StreamScheduler* parent,
|
Stream(StreamScheduler* parent,
|
||||||
StreamProducer* producer,
|
StreamCallback* callback,
|
||||||
StreamID stream_id,
|
StreamID stream_id,
|
||||||
StreamPriority priority)
|
StreamPriority priority)
|
||||||
: parent_(*parent),
|
: parent_(*parent),
|
||||||
producer_(*producer),
|
callback_(*callback),
|
||||||
stream_id_(stream_id),
|
stream_id_(stream_id),
|
||||||
priority_(priority) {}
|
priority_(priority) {}
|
||||||
|
|
||||||
@ -117,14 +117,14 @@ class StreamScheduler {
|
|||||||
VirtualTime current_time() const { return current_virtual_time_; }
|
VirtualTime current_time() const { return current_virtual_time_; }
|
||||||
VirtualTime next_finish_time() const { return next_finish_time_; }
|
VirtualTime next_finish_time() const { return next_finish_time_; }
|
||||||
size_t bytes_to_send_in_next_message() const {
|
size_t bytes_to_send_in_next_message() const {
|
||||||
return producer_.bytes_to_send_in_next_message();
|
return callback_.bytes_to_send_in_next_message();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns the next virtual finish time for this stream.
|
// Returns the next virtual finish time for this stream.
|
||||||
VirtualTime GetNextFinishTime() const;
|
VirtualTime GetNextFinishTime() const;
|
||||||
|
|
||||||
StreamScheduler& parent_;
|
StreamScheduler& parent_;
|
||||||
StreamProducer& producer_;
|
StreamCallback& callback_;
|
||||||
const StreamID stream_id_;
|
const StreamID stream_id_;
|
||||||
StreamPriority priority_;
|
StreamPriority priority_;
|
||||||
// This outgoing stream's "current" virtual_time.
|
// This outgoing stream's "current" virtual_time.
|
||||||
@ -132,10 +132,10 @@ class StreamScheduler {
|
|||||||
VirtualTime next_finish_time_ = VirtualTime::Zero();
|
VirtualTime next_finish_time_ = VirtualTime::Zero();
|
||||||
};
|
};
|
||||||
|
|
||||||
std::unique_ptr<Stream> CreateStream(StreamProducer* producer,
|
std::unique_ptr<Stream> CreateStream(StreamCallback* callback,
|
||||||
StreamID stream_id,
|
StreamID stream_id,
|
||||||
StreamPriority priority) {
|
StreamPriority priority) {
|
||||||
return absl::WrapUnique(new Stream(this, producer, stream_id, priority));
|
return absl::WrapUnique(new Stream(this, callback, stream_id, priority));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Makes the scheduler stop producing message from the current stream and
|
// Makes the scheduler stop producing message from the current stream and
|
||||||
|
@ -59,7 +59,7 @@ std::map<StreamID, size_t> GetPacketCounts(StreamScheduler& scheduler,
|
|||||||
return packet_counts;
|
return packet_counts;
|
||||||
}
|
}
|
||||||
|
|
||||||
class MockStreamProducer : public StreamScheduler::StreamProducer {
|
class MockStreamCallback : public StreamScheduler::StreamCallback {
|
||||||
public:
|
public:
|
||||||
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
|
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
|
||||||
Produce,
|
Produce,
|
||||||
@ -74,18 +74,18 @@ class TestStream {
|
|||||||
StreamID stream_id,
|
StreamID stream_id,
|
||||||
StreamPriority priority,
|
StreamPriority priority,
|
||||||
size_t packet_size = kPayloadSize) {
|
size_t packet_size = kPayloadSize) {
|
||||||
EXPECT_CALL(producer_, Produce)
|
EXPECT_CALL(callback_, Produce)
|
||||||
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
|
.WillRepeatedly(CreateChunk(stream_id, MID(0), packet_size));
|
||||||
EXPECT_CALL(producer_, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback_, bytes_to_send_in_next_message)
|
||||||
.WillRepeatedly(Return(packet_size));
|
.WillRepeatedly(Return(packet_size));
|
||||||
stream_ = scheduler.CreateStream(&producer_, stream_id, priority);
|
stream_ = scheduler.CreateStream(&callback_, stream_id, priority);
|
||||||
stream_->MaybeMakeActive();
|
stream_->MaybeMakeActive();
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamScheduler::Stream& stream() { return *stream_; }
|
StreamScheduler::Stream& stream() { return *stream_; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
StrictMock<MockStreamProducer> producer_;
|
StrictMock<MockStreamCallback> callback_;
|
||||||
std::unique_ptr<StreamScheduler::Stream> stream_;
|
std::unique_ptr<StreamScheduler::Stream> stream_;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -100,9 +100,9 @@ TEST(StreamSchedulerTest, HasNoActiveStreams) {
|
|||||||
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer;
|
StrictMock<MockStreamCallback> callback;
|
||||||
auto stream =
|
auto stream =
|
||||||
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
|
||||||
|
|
||||||
EXPECT_EQ(stream->stream_id(), StreamID(1));
|
EXPECT_EQ(stream->stream_id(), StreamID(1));
|
||||||
EXPECT_EQ(stream->priority(), StreamPriority(2));
|
EXPECT_EQ(stream->priority(), StreamPriority(2));
|
||||||
@ -115,13 +115,13 @@ TEST(StreamSchedulerTest, CanSetAndGetStreamProperties) {
|
|||||||
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer;
|
StrictMock<MockStreamCallback> callback;
|
||||||
EXPECT_CALL(producer, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
|
EXPECT_CALL(callback, Produce).WillOnce(CreateChunk(StreamID(1), MID(0)));
|
||||||
EXPECT_CALL(producer, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream =
|
auto stream =
|
||||||
scheduler.CreateStream(&producer, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback, StreamID(1), StreamPriority(2));
|
||||||
stream->MaybeMakeActive();
|
stream->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(0)));
|
||||||
@ -132,32 +132,32 @@ TEST(StreamSchedulerTest, CanProduceFromSingleStream) {
|
|||||||
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer1;
|
StrictMock<MockStreamCallback> callback1;
|
||||||
EXPECT_CALL(producer1, Produce)
|
EXPECT_CALL(callback1, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream1 =
|
auto stream1 =
|
||||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||||
stream1->MaybeMakeActive();
|
stream1->MaybeMakeActive();
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer2;
|
StrictMock<MockStreamCallback> callback2;
|
||||||
EXPECT_CALL(producer2, Produce)
|
EXPECT_CALL(callback2, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream2 =
|
auto stream2 =
|
||||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||||
stream2->MaybeMakeActive();
|
stream2->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||||
@ -174,8 +174,8 @@ TEST(StreamSchedulerTest, WillRoundRobinBetweenStreams) {
|
|||||||
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer1;
|
StrictMock<MockStreamCallback> callback1;
|
||||||
EXPECT_CALL(producer1, Produce)
|
EXPECT_CALL(callback1, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||||
.WillOnce([](...) {
|
.WillOnce([](...) {
|
||||||
return SendQueue::DataToSend(
|
return SendQueue::DataToSend(
|
||||||
@ -196,7 +196,7 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||||||
Data::IsEnd(true), IsUnordered(true)));
|
Data::IsEnd(true), IsUnordered(true)));
|
||||||
})
|
})
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
@ -204,21 +204,21 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream1 =
|
auto stream1 =
|
||||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||||
stream1->MaybeMakeActive();
|
stream1->MaybeMakeActive();
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer2;
|
StrictMock<MockStreamCallback> callback2;
|
||||||
EXPECT_CALL(producer2, Produce)
|
EXPECT_CALL(callback2, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream2 =
|
auto stream2 =
|
||||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||||
stream2->MaybeMakeActive();
|
stream2->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||||
@ -236,16 +236,16 @@ TEST(StreamSchedulerTest, WillRoundRobinOnlyWhenFinishedProducingChunk) {
|
|||||||
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer1;
|
StrictMock<MockStreamCallback> callback1;
|
||||||
EXPECT_CALL(producer1, Produce)
|
EXPECT_CALL(callback1, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(101)));
|
.WillOnce(CreateChunk(StreamID(1), MID(101)));
|
||||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
|
.WillOnce(Return(kPayloadSize)); // hints that there is a MID(2) coming.
|
||||||
auto stream1 =
|
auto stream1 =
|
||||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||||
stream1->MaybeMakeActive();
|
stream1->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||||
@ -260,20 +260,20 @@ TEST(StreamSchedulerTest, StreamsCanBeMadeInactive) {
|
|||||||
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer1;
|
StrictMock<MockStreamCallback> callback1;
|
||||||
// Callbacks are setup so that they hint that there is a MID(2) coming...
|
// Callbacks are setup so that they hint that there is a MID(2) coming...
|
||||||
EXPECT_CALL(producer1, Produce)
|
EXPECT_CALL(callback1, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active again
|
.WillOnce(Return(kPayloadSize)) // When making active again
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream1 =
|
auto stream1 =
|
||||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||||
stream1->MaybeMakeActive();
|
stream1->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||||
@ -290,33 +290,33 @@ TEST(StreamSchedulerTest, SingleStreamCanBeResumed) {
|
|||||||
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
|
TEST(StreamSchedulerTest, WillRoundRobinWithPausedStream) {
|
||||||
StreamScheduler scheduler;
|
StreamScheduler scheduler;
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer1;
|
StrictMock<MockStreamCallback> callback1;
|
||||||
EXPECT_CALL(producer1, Produce)
|
EXPECT_CALL(callback1, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
.WillOnce(CreateChunk(StreamID(1), MID(100)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
.WillOnce(CreateChunk(StreamID(1), MID(101)))
|
||||||
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
.WillOnce(CreateChunk(StreamID(1), MID(102)));
|
||||||
EXPECT_CALL(producer1, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback1, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream1 =
|
auto stream1 =
|
||||||
scheduler.CreateStream(&producer1, StreamID(1), StreamPriority(2));
|
scheduler.CreateStream(&callback1, StreamID(1), StreamPriority(2));
|
||||||
stream1->MaybeMakeActive();
|
stream1->MaybeMakeActive();
|
||||||
|
|
||||||
StrictMock<MockStreamProducer> producer2;
|
StrictMock<MockStreamCallback> callback2;
|
||||||
EXPECT_CALL(producer2, Produce)
|
EXPECT_CALL(callback2, Produce)
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
.WillOnce(CreateChunk(StreamID(2), MID(200)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
.WillOnce(CreateChunk(StreamID(2), MID(201)))
|
||||||
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
.WillOnce(CreateChunk(StreamID(2), MID(202)));
|
||||||
EXPECT_CALL(producer2, bytes_to_send_in_next_message)
|
EXPECT_CALL(callback2, bytes_to_send_in_next_message)
|
||||||
.WillOnce(Return(kPayloadSize)) // When making active
|
.WillOnce(Return(kPayloadSize)) // When making active
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(kPayloadSize))
|
.WillOnce(Return(kPayloadSize))
|
||||||
.WillOnce(Return(0));
|
.WillOnce(Return(0));
|
||||||
auto stream2 =
|
auto stream2 =
|
||||||
scheduler.CreateStream(&producer2, StreamID(2), StreamPriority(2));
|
scheduler.CreateStream(&callback2, StreamID(2), StreamPriority(2));
|
||||||
stream2->MaybeMakeActive();
|
stream2->MaybeMakeActive();
|
||||||
|
|
||||||
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
EXPECT_THAT(scheduler.Produce(TimeMs(0), kMtu), HasDataWithMid(MID(100)));
|
||||||
|
Reference in New Issue
Block a user