Revert "dcsctp: Add public API for setting priorities"

This reverts commit 17a02a31d7d2897b75ad69fdac5d10e7475a5865.

Reason for revert: Breaks downstream test

Original change's description:
> dcsctp: Add public API for setting priorities
>
> This is the first part of supporting stream priorities, and adds the API
> and very basic support for setting and retrieving the stream priority.
>
> This commit doesn't in any way change the actual packet sending - the
> specified priority values are stored, but not acted on.
>
> This is all that is client visible, so clients can start using the API
> as written, and they would never notice that things are missing.
>
> Bug: webrtc:5696
> Change-Id: I24fce8cbb6f3cba187df99d1d3f45e73621c93c6
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/261943
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Victor Boivie <boivie@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#37034}

Bug: webrtc:5696
Change-Id: If172d9c9dbce7aae72152abbbae1ccc77340bbc1
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/264444
Owners-Override: Björn Terelius <terelius@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Commit-Queue: Björn Terelius <terelius@webrtc.org>
Auto-Submit: Björn Terelius <terelius@webrtc.org>
Reviewed-by: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37039}
This commit is contained in:
Björn Terelius
2022-05-30 14:04:06 +00:00
committed by WebRTC LUCI CQ
parent a136ed4085
commit 51e5bacb8b
11 changed files with 8 additions and 158 deletions

View File

@ -48,7 +48,6 @@ struct DcSctpSocketHandoverState {
uint32_t next_ssn = 0; uint32_t next_ssn = 0;
uint32_t next_unordered_mid = 0; uint32_t next_unordered_mid = 0;
uint32_t next_ordered_mid = 0; uint32_t next_ordered_mid = 0;
uint16_t priority = 0;
}; };
struct Transmission { struct Transmission {
uint32_t next_tsn = 0; uint32_t next_tsn = 0;

View File

@ -71,11 +71,6 @@ struct DcSctpOptions {
// `max_receiver_window_buffer_size`). // `max_receiver_window_buffer_size`).
size_t max_message_size = 256 * 1024; size_t max_message_size = 256 * 1024;
// The default stream priority, if not overridden by
// `SctpSocket::SetStreamPriority`. The default value is selected to be
// compatible with https://www.w3.org/TR/webrtc-priority/, section 4.2-4.3.
StreamPriority default_stream_priority = StreamPriority(256);
// Maximum received window buffer size. This should be a bit larger than the // Maximum received window buffer size. This should be a bit larger than the
// largest sized message you want to be able to receive. This essentially // largest sized message you want to be able to receive. This essentially
// limits the memory usage on the receive side. Note that memory is allocated // limits the memory usage on the receive side. Note that memory is allocated

View File

@ -430,15 +430,6 @@ class DcSctpSocketInterface {
// Update the options max_message_size. // Update the options max_message_size.
virtual void SetMaxMessageSize(size_t max_message_size) = 0; virtual void SetMaxMessageSize(size_t max_message_size) = 0;
// Sets the priority of an outgoing stream. The initial value, when not set,
// is `DcSctpOptions::default_stream_priority`.
virtual void SetStreamPriority(StreamID stream_id,
StreamPriority priority) = 0;
// Returns the currently set priority for an outgoing stream. The initial
// value, when not set, is `DcSctpOptions::default_stream_priority`.
virtual StreamPriority GetStreamPriority(StreamID stream_id) const = 0;
// Sends the message `message` using the provided send options. // Sends the message `message` using the provided send options.
// Sending a message is an asynchrous operation, and the `OnError` callback // Sending a message is an asynchrous operation, and the `OnError` callback
// may be invoked to indicate any errors in sending the message. // may be invoked to indicate any errors in sending the message.

View File

@ -41,16 +41,6 @@ class MockDcSctpSocket : public DcSctpSocketInterface {
MOCK_METHOD(void, SetMaxMessageSize, (size_t max_message_size), (override)); MOCK_METHOD(void, SetMaxMessageSize, (size_t max_message_size), (override));
MOCK_METHOD(void,
SetStreamPriority,
(StreamID stream_id, StreamPriority priority),
(override));
MOCK_METHOD(StreamPriority,
GetStreamPriority,
(StreamID stream_id),
(const, override));
MOCK_METHOD(SendStatus, MOCK_METHOD(SendStatus,
Send, Send,
(DcSctpMessage message, const SendOptions& send_options), (DcSctpMessage message, const SendOptions& send_options),

View File

@ -31,10 +31,6 @@ using TimeoutID = webrtc::StrongAlias<class TimeoutTag, uint64_t>;
// other messages on the same stream. // other messages on the same stream.
using IsUnordered = webrtc::StrongAlias<class IsUnorderedTag, bool>; using IsUnordered = webrtc::StrongAlias<class IsUnorderedTag, bool>;
// Stream priority, where higher values indicate higher priority. The meaning of
// this value and how it's used depends on the stream scheduler.
using StreamPriority = webrtc::StrongAlias<class StreamPriorityTag, uint16_t>;
// Duration, as milliseconds. Overflows after 24 days. // Duration, as milliseconds. Overflows after 24 days.
class DurationMs : public webrtc::StrongAlias<class DurationMsTag, int32_t> { class DurationMs : public webrtc::StrongAlias<class DurationMsTag, int32_t> {
public: public:

View File

@ -189,7 +189,6 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
send_queue_( send_queue_(
log_prefix_, log_prefix_,
options_.max_send_buffer_size, options_.max_send_buffer_size,
options_.default_stream_priority,
[this](StreamID stream_id) { [this](StreamID stream_id) {
callbacks_.OnBufferedAmountLow(stream_id); callbacks_.OnBufferedAmountLow(stream_id);
}, },
@ -421,14 +420,6 @@ void DcSctpSocket::InternalClose(ErrorKind error, absl::string_view message) {
RTC_DCHECK(IsConsistent()); RTC_DCHECK(IsConsistent());
} }
void DcSctpSocket::SetStreamPriority(StreamID stream_id,
StreamPriority priority) {
send_queue_.SetStreamPriority(stream_id, priority);
}
StreamPriority DcSctpSocket::GetStreamPriority(StreamID stream_id) const {
return send_queue_.GetStreamPriority(stream_id);
}
SendStatus DcSctpSocket::Send(DcSctpMessage message, SendStatus DcSctpSocket::Send(DcSctpMessage message,
const SendOptions& send_options) { const SendOptions& send_options) {
RTC_DCHECK_RUN_ON(&thread_checker_); RTC_DCHECK_RUN_ON(&thread_checker_);

View File

@ -96,8 +96,6 @@ class DcSctpSocket : public DcSctpSocketInterface {
SocketState state() const override; SocketState state() const override;
const DcSctpOptions& options() const override { return options_; } const DcSctpOptions& options() const override { return options_; }
void SetMaxMessageSize(size_t max_message_size) override; void SetMaxMessageSize(size_t max_message_size) override;
void SetStreamPriority(StreamID stream_id, StreamPriority priority) override;
StreamPriority GetStreamPriority(StreamID stream_id) const override;
size_t buffered_amount(StreamID stream_id) const override; size_t buffered_amount(StreamID stream_id) const override;
size_t buffered_amount_low_threshold(StreamID stream_id) const override; size_t buffered_amount_low_threshold(StreamID stream_id) const override;
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;

View File

@ -2333,51 +2333,6 @@ TEST(DcSctpSocketTest, CloseStreamsWithPendingRequest) {
absl::optional<DcSctpMessage> msg6 = z.cb.ConsumeReceivedMessage(); absl::optional<DcSctpMessage> msg6 = z.cb.ConsumeReceivedMessage();
ASSERT_TRUE(msg6.has_value()); ASSERT_TRUE(msg6.has_value());
EXPECT_EQ(msg6->stream_id(), StreamID(3)); EXPECT_EQ(msg6->stream_id(), StreamID(3));
} } // namespace
TEST(DcSctpSocketTest, StreamsHaveInitialPriority) {
DcSctpOptions options = {.default_stream_priority = StreamPriority(42)};
SocketUnderTest a("A", options);
EXPECT_EQ(a.socket.GetStreamPriority(StreamID(1)),
options.default_stream_priority);
a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), kSendOptions);
EXPECT_EQ(a.socket.GetStreamPriority(StreamID(2)),
options.default_stream_priority);
}
TEST(DcSctpSocketTest, CanChangeStreamPriority) {
DcSctpOptions options = {.default_stream_priority = StreamPriority(42)};
SocketUnderTest a("A", options);
a.socket.SetStreamPriority(StreamID(1), StreamPriority(43));
EXPECT_EQ(a.socket.GetStreamPriority(StreamID(1)), StreamPriority(43));
a.socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), kSendOptions);
a.socket.SetStreamPriority(StreamID(2), StreamPriority(43));
EXPECT_EQ(a.socket.GetStreamPriority(StreamID(2)), StreamPriority(43));
}
TEST_P(DcSctpSocketParametrizedTest, WillHandoverPriority) {
DcSctpOptions options = {.default_stream_priority = StreamPriority(42)};
auto a = std::make_unique<SocketUnderTest>("A", options);
SocketUnderTest z("Z");
ConnectSockets(*a, z);
a->socket.SetStreamPriority(StreamID(1), StreamPriority(43));
a->socket.Send(DcSctpMessage(StreamID(2), PPID(53), {1, 2}), kSendOptions);
a->socket.SetStreamPriority(StreamID(2), StreamPriority(43));
ExchangeMessages(*a, z);
a = MaybeHandoverSocket(std::move(a));
EXPECT_EQ(a->socket.GetStreamPriority(StreamID(1)), StreamPriority(43));
EXPECT_EQ(a->socket.GetStreamPriority(StreamID(2)), StreamPriority(43));
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp

View File

@ -30,13 +30,11 @@ namespace dcsctp {
RRSendQueue::RRSendQueue(absl::string_view log_prefix, RRSendQueue::RRSendQueue(absl::string_view log_prefix,
size_t buffer_size, size_t buffer_size,
StreamPriority default_priority,
std::function<void(StreamID)> on_buffered_amount_low, std::function<void(StreamID)> on_buffered_amount_low,
size_t total_buffered_amount_low_threshold, size_t total_buffered_amount_low_threshold,
std::function<void()> on_total_buffered_amount_low) std::function<void()> on_total_buffered_amount_low)
: log_prefix_(std::string(log_prefix) + "fcfs: "), : log_prefix_(std::string(log_prefix) + "fcfs: "),
buffer_size_(buffer_size), buffer_size_(buffer_size),
default_priority_(default_priority),
on_buffered_amount_low_(std::move(on_buffered_amount_low)), on_buffered_amount_low_(std::move(on_buffered_amount_low)),
total_buffered_amount_(std::move(on_total_buffered_amount_low)) { total_buffered_amount_(std::move(on_total_buffered_amount_low)) {
total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold); total_buffered_amount_.SetLowThreshold(total_buffered_amount_low_threshold);
@ -77,7 +75,6 @@ void RRSendQueue::OutgoingStream::AddHandoverState(
state.next_ssn = next_ssn_.value(); state.next_ssn = next_ssn_.value();
state.next_ordered_mid = next_ordered_mid_.value(); state.next_ordered_mid = next_ordered_mid_.value();
state.next_unordered_mid = next_unordered_mid_.value(); state.next_unordered_mid = next_unordered_mid_.value();
state.priority = *priority_;
} }
bool RRSendQueue::IsConsistent() const { bool RRSendQueue::IsConsistent() const {
@ -518,28 +515,12 @@ RRSendQueue::OutgoingStream& RRSendQueue::GetOrCreateStreamInfo(
return streams_ return streams_
.emplace(stream_id, .emplace(stream_id,
OutgoingStream( OutgoingStream(
stream_id, default_priority_, stream_id,
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, [this, stream_id]() { on_buffered_amount_low_(stream_id); },
total_buffered_amount_)) total_buffered_amount_))
.first->second; .first->second;
} }
void RRSendQueue::SetStreamPriority(StreamID stream_id,
StreamPriority priority) {
OutgoingStream& stream = GetOrCreateStreamInfo(stream_id);
stream.set_priority(priority);
RTC_DCHECK(IsConsistent());
}
StreamPriority RRSendQueue::GetStreamPriority(StreamID stream_id) const {
auto stream_it = streams_.find(stream_id);
if (stream_it == streams_.end()) {
return default_priority_;
}
return stream_it->second.priority();
}
HandoverReadinessStatus RRSendQueue::GetHandoverReadiness() const { HandoverReadinessStatus RRSendQueue::GetHandoverReadiness() const {
HandoverReadinessStatus status; HandoverReadinessStatus status;
if (!IsEmpty()) { if (!IsEmpty()) {
@ -561,12 +542,12 @@ void RRSendQueue::RestoreFromState(const DcSctpSocketHandoverState& state) {
for (const DcSctpSocketHandoverState::OutgoingStream& state_stream : for (const DcSctpSocketHandoverState::OutgoingStream& state_stream :
state.tx.streams) { state.tx.streams) {
StreamID stream_id(state_stream.id); StreamID stream_id(state_stream.id);
streams_.emplace( streams_.emplace(stream_id, OutgoingStream(
stream_id, stream_id,
OutgoingStream( [this, stream_id]() {
stream_id, StreamPriority(state_stream.priority), on_buffered_amount_low_(stream_id);
[this, stream_id]() { on_buffered_amount_low_(stream_id); }, },
total_buffered_amount_, &state_stream)); total_buffered_amount_, &state_stream));
} }
} }
} // namespace dcsctp } // namespace dcsctp

View File

@ -43,7 +43,6 @@ class RRSendQueue : public SendQueue {
public: public:
RRSendQueue(absl::string_view log_prefix, RRSendQueue(absl::string_view log_prefix,
size_t buffer_size, size_t buffer_size,
StreamPriority default_priority,
std::function<void(StreamID)> on_buffered_amount_low, std::function<void(StreamID)> on_buffered_amount_low,
size_t total_buffered_amount_low_threshold, size_t total_buffered_amount_low_threshold,
std::function<void()> on_total_buffered_amount_low); std::function<void()> on_total_buffered_amount_low);
@ -80,8 +79,6 @@ class RRSendQueue : public SendQueue {
size_t buffered_amount_low_threshold(StreamID stream_id) const override; size_t buffered_amount_low_threshold(StreamID stream_id) const override;
void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override; void SetBufferedAmountLowThreshold(StreamID stream_id, size_t bytes) override;
void SetStreamPriority(StreamID stream_id, StreamPriority priority);
StreamPriority GetStreamPriority(StreamID stream_id) const;
HandoverReadinessStatus GetHandoverReadiness() const; HandoverReadinessStatus GetHandoverReadiness() const;
void AddHandoverState(DcSctpSocketHandoverState& state); void AddHandoverState(DcSctpSocketHandoverState& state);
void RestoreFromState(const DcSctpSocketHandoverState& state); void RestoreFromState(const DcSctpSocketHandoverState& state);
@ -115,12 +112,10 @@ class RRSendQueue : public SendQueue {
public: public:
OutgoingStream( OutgoingStream(
StreamID stream_id, StreamID stream_id,
StreamPriority priority,
std::function<void()> on_buffered_amount_low, std::function<void()> on_buffered_amount_low,
ThresholdWatcher& total_buffered_amount, ThresholdWatcher& total_buffered_amount,
const DcSctpSocketHandoverState::OutgoingStream* state = nullptr) const DcSctpSocketHandoverState::OutgoingStream* state = nullptr)
: stream_id_(stream_id), : stream_id_(stream_id),
priority_(priority),
next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)), next_unordered_mid_(MID(state ? state->next_unordered_mid : 0)),
next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)), next_ordered_mid_(MID(state ? state->next_ordered_mid : 0)),
next_ssn_(SSN(state ? state->next_ssn : 0)), next_ssn_(SSN(state ? state->next_ssn : 0)),
@ -171,9 +166,6 @@ class RRSendQueue : public SendQueue {
// expired non-partially sent message. // expired non-partially sent message.
bool HasDataToSend(TimeMs now); bool HasDataToSend(TimeMs now);
void set_priority(StreamPriority priority) { priority_ = priority; }
StreamPriority priority() const { return priority_; }
void AddHandoverState( void AddHandoverState(
DcSctpSocketHandoverState::OutgoingStream& state) const; DcSctpSocketHandoverState::OutgoingStream& state) const;
@ -226,7 +218,6 @@ class RRSendQueue : public SendQueue {
bool IsConsistent() const; bool IsConsistent() const;
const StreamID stream_id_; const StreamID stream_id_;
StreamPriority priority_;
PauseState pause_state_ = PauseState::kNotPaused; PauseState pause_state_ = PauseState::kNotPaused;
// MIDs are different for unordered and ordered messages sent on a stream. // MIDs are different for unordered and ordered messages sent on a stream.
MID next_unordered_mid_; MID next_unordered_mid_;
@ -256,7 +247,6 @@ class RRSendQueue : public SendQueue {
const std::string log_prefix_; const std::string log_prefix_;
const size_t buffer_size_; const size_t buffer_size_;
const StreamPriority default_priority_;
// Called when the buffered amount is below what has been set using // Called when the buffered amount is below what has been set using
// `SetBufferedAmountLowThreshold`. // `SetBufferedAmountLowThreshold`.

View File

@ -32,7 +32,6 @@ constexpr TimeMs kNow = TimeMs(0);
constexpr StreamID kStreamID(1); constexpr StreamID kStreamID(1);
constexpr PPID kPPID(53); constexpr PPID kPPID(53);
constexpr size_t kMaxQueueSize = 1000; constexpr size_t kMaxQueueSize = 1000;
constexpr StreamPriority kDefaultPriority(10);
constexpr size_t kBufferedAmountLowThreshold = 500; constexpr size_t kBufferedAmountLowThreshold = 500;
constexpr size_t kOneFragmentPacketSize = 100; constexpr size_t kOneFragmentPacketSize = 100;
constexpr size_t kTwoFragmentPacketSize = 101; constexpr size_t kTwoFragmentPacketSize = 101;
@ -42,7 +41,6 @@ class RRSendQueueTest : public testing::Test {
RRSendQueueTest() RRSendQueueTest()
: buf_("log: ", : buf_("log: ",
kMaxQueueSize, kMaxQueueSize,
kDefaultPriority,
on_buffered_amount_low_.AsStdFunction(), on_buffered_amount_low_.AsStdFunction(),
kBufferedAmountLowThreshold, kBufferedAmountLowThreshold,
on_total_buffered_amount_low_.AsStdFunction()) {} on_total_buffered_amount_low_.AsStdFunction()) {}
@ -761,39 +759,5 @@ TEST_F(RRSendQueueTest, WillStayInAStreamAsLongAsThatMessageIsSending) {
EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value()); EXPECT_FALSE(buf_.Produce(kNow, kOneFragmentPacketSize).has_value());
} }
TEST_F(RRSendQueueTest, StreamsHaveInitialPriority) {
EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), kDefaultPriority);
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), kDefaultPriority);
}
TEST_F(RRSendQueueTest, CanChangeStreamPriority) {
buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
EXPECT_EQ(buf_.GetStreamPriority(StreamID(1)), StreamPriority(42));
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
EXPECT_EQ(buf_.GetStreamPriority(StreamID(2)), StreamPriority(42));
}
TEST_F(RRSendQueueTest, WillHandoverPriority) {
buf_.SetStreamPriority(StreamID(1), StreamPriority(42));
buf_.Add(kNow, DcSctpMessage(StreamID(2), kPPID, std::vector<uint8_t>(40)));
buf_.SetStreamPriority(StreamID(2), StreamPriority(42));
DcSctpSocketHandoverState state;
buf_.AddHandoverState(state);
RRSendQueue q2("log: ", kMaxQueueSize, kDefaultPriority,
on_buffered_amount_low_.AsStdFunction(),
kBufferedAmountLowThreshold,
on_total_buffered_amount_low_.AsStdFunction());
q2.RestoreFromState(state);
EXPECT_EQ(q2.GetStreamPriority(StreamID(1)), StreamPriority(42));
EXPECT_EQ(q2.GetStreamPriority(StreamID(2)), StreamPriority(42));
}
} // namespace } // namespace
} // namespace dcsctp } // namespace dcsctp