dcsctp: Add Send Queue
When the client asks for a message to be sent, it's put in the SendQueue, which is available even when the socket is not yet connected. When the socket is connected, those messages will be sent on the wire, possibly fragmented if the message is large enough to not fit inside a single packet. When the message has been fully sent, it's removed from the send queue (but it will be in the RetransmissionQueue - which is added in a follow-up change, until the message has been ACKed). The Send Queue is a FIFO queue in this iteration, and in SCTP, that's called a "First Come, First Served" queue, or FCFS. In follow-up work, the queue and the actual scheduling algorithm which decides which message that is sent, when there are messages in multiple streams, will likely be decoupled. But in this iteration, they're in the same class. Bug: webrtc:12614 Change-Id: Iec1183e625499a21e402e4f2a5ebcf989bc5c3ec Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214044 Reviewed-by: Harald Alvestrand <hta@webrtc.org> Commit-Queue: Victor Boivie <boivie@webrtc.org> Cr-Commit-Position: refs/heads/master@{#33798}
This commit is contained in:
committed by
Commit Bot
parent
67b80ac5b2
commit
762f21ce8d
@ -18,6 +18,7 @@ if (rtc_include_tests) {
|
||||
"public:dcsctp_public_unittests",
|
||||
"rx:dcsctp_rx_unittests",
|
||||
"timer:dcsctp_timer_unittests",
|
||||
"tx:dcsctp_tx_unittests",
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@
|
||||
#include <utility>
|
||||
|
||||
#include "net/dcsctp/public/strong_alias.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
@ -34,5 +35,13 @@ using ReconfigRequestSN = StrongAlias<class ReconfigRequestSNTag, uint32_t>;
|
||||
// Verification Tag, used for packet validation.
|
||||
using VerificationTag = StrongAlias<class VerificationTagTag, uint32_t>;
|
||||
|
||||
// Hasher for separated ordered/unordered stream identifiers.
|
||||
struct UnorderedStreamHash {
|
||||
size_t operator()(const std::pair<IsUnordered, StreamID>& p) const {
|
||||
return std::hash<IsUnordered::UnderlyingType>{}(*p.first) ^
|
||||
(std::hash<StreamID::UnderlyingType>{}(*p.second) << 1);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace dcsctp
|
||||
#endif // NET_DCSCTP_COMMON_INTERNAL_TYPES_H_
|
||||
|
||||
53
net/dcsctp/tx/BUILD.gn
Normal file
53
net/dcsctp/tx/BUILD.gn
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
#
|
||||
# Use of this source code is governed by a BSD-style license
|
||||
# that can be found in the LICENSE file in the root of the source
|
||||
# tree. An additional intellectual property rights grant can be found
|
||||
# in the file PATENTS. All contributing project authors may
|
||||
# be found in the AUTHORS file in the root of the source tree.
|
||||
|
||||
import("../../../webrtc.gni")
|
||||
|
||||
rtc_source_set("send_queue") {
|
||||
deps = [
|
||||
"../common:internal_types",
|
||||
"../packet:chunk",
|
||||
"../public:types",
|
||||
]
|
||||
sources = [ "send_queue.h" ]
|
||||
}
|
||||
|
||||
rtc_library("fcfs_send_queue") {
|
||||
deps = [
|
||||
":send_queue",
|
||||
"../../../api:array_view",
|
||||
"../../../rtc_base:checks",
|
||||
"../../../rtc_base:rtc_base_approved",
|
||||
]
|
||||
sources = [
|
||||
"fcfs_send_queue.cc",
|
||||
"fcfs_send_queue.h",
|
||||
]
|
||||
}
|
||||
|
||||
if (rtc_include_tests) {
|
||||
rtc_source_set("mock_send_queue") {
|
||||
testonly = true
|
||||
deps = [ ":send_queue" ]
|
||||
sources = [ "mock_send_queue.h" ]
|
||||
}
|
||||
|
||||
rtc_library("dcsctp_tx_unittests") {
|
||||
testonly = true
|
||||
|
||||
deps = [
|
||||
":fcfs_send_queue",
|
||||
"../../../api:array_view",
|
||||
"../../../rtc_base:checks",
|
||||
"../../../rtc_base:gunit_helpers",
|
||||
"../../../rtc_base:rtc_base_approved",
|
||||
"../../../test:test_support",
|
||||
]
|
||||
sources = [ "fcfs_send_queue_test.cc" ]
|
||||
}
|
||||
}
|
||||
247
net/dcsctp/tx/fcfs_send_queue.cc
Normal file
247
net/dcsctp/tx/fcfs_send_queue.cc
Normal file
@ -0,0 +1,247 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#include "net/dcsctp/tx/fcfs_send_queue.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_message.h"
|
||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "rtc_base/logging.h"
|
||||
|
||||
namespace dcsctp {
|
||||
void FCFSSendQueue::Add(TimeMs now,
|
||||
DcSctpMessage message,
|
||||
const SendOptions& send_options) {
|
||||
RTC_DCHECK(!message.payload().empty());
|
||||
std::deque<Item>& queue =
|
||||
IsPaused(message.stream_id()) ? paused_items_ : items_;
|
||||
// Any limited lifetime should start counting from now - when the message
|
||||
// has been added to the queue.
|
||||
absl::optional<TimeMs> expires_at = absl::nullopt;
|
||||
if (send_options.lifetime.has_value()) {
|
||||
expires_at = now + *send_options.lifetime;
|
||||
}
|
||||
queue.emplace_back(std::move(message), expires_at, send_options);
|
||||
}
|
||||
|
||||
size_t FCFSSendQueue::total_bytes() const {
|
||||
// TODO(boivie): Have the current size as a member variable, so that's it not
|
||||
// calculated for every operation.
|
||||
return absl::c_accumulate(items_, 0,
|
||||
[](size_t size, const Item& item) {
|
||||
return size + item.remaining_size;
|
||||
}) +
|
||||
absl::c_accumulate(paused_items_, 0,
|
||||
[](size_t size, const Item& item) {
|
||||
return size + item.remaining_size;
|
||||
});
|
||||
}
|
||||
|
||||
bool FCFSSendQueue::IsFull() const {
|
||||
return total_bytes() >= buffer_size_;
|
||||
}
|
||||
|
||||
bool FCFSSendQueue::IsEmpty() const {
|
||||
return items_.empty();
|
||||
}
|
||||
|
||||
FCFSSendQueue::Item* FCFSSendQueue::GetFirstNonExpiredMessage(TimeMs now) {
|
||||
while (!items_.empty()) {
|
||||
FCFSSendQueue::Item& item = items_.front();
|
||||
// An entire item can be discarded iff:
|
||||
// 1) It hasn't been partially sent (has been allocated a message_id).
|
||||
// 2) It has a non-negative expiry time.
|
||||
// 3) And that expiry time has passed.
|
||||
if (!item.message_id.has_value() && item.expires_at.has_value() &&
|
||||
*item.expires_at <= now) {
|
||||
// TODO(boivie): This should be reported to the client.
|
||||
RTC_DLOG(LS_VERBOSE)
|
||||
<< log_prefix_
|
||||
<< "Message is expired before even partially sent - discarding";
|
||||
items_.pop_front();
|
||||
continue;
|
||||
}
|
||||
|
||||
return &item;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
absl::optional<SendQueue::DataToSend> FCFSSendQueue::Produce(TimeMs now,
|
||||
size_t max_size) {
|
||||
Item* item = GetFirstNonExpiredMessage(now);
|
||||
if (item == nullptr) {
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
DcSctpMessage& message = item->message;
|
||||
|
||||
// Don't make too small fragments as that can result in increased risk of
|
||||
// failure to assemble a message if a small fragment is missing.
|
||||
if (item->remaining_size > max_size && max_size < kMinimumFragmentedPayload) {
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Will not fragment "
|
||||
<< item->remaining_size << " bytes into buffer of "
|
||||
<< max_size << " bytes";
|
||||
return absl::nullopt;
|
||||
}
|
||||
|
||||
// Allocate Message ID and SSN when the first fragment is sent.
|
||||
if (!item->message_id.has_value()) {
|
||||
MID& mid =
|
||||
mid_by_stream_id_[{item->send_options.unordered, message.stream_id()}];
|
||||
item->message_id = mid;
|
||||
mid = MID(*mid + 1);
|
||||
}
|
||||
if (!item->send_options.unordered && !item->ssn.has_value()) {
|
||||
SSN& ssn = ssn_by_stream_id_[message.stream_id()];
|
||||
item->ssn = ssn;
|
||||
ssn = SSN(*ssn + 1);
|
||||
}
|
||||
|
||||
// Grab the next `max_size` fragment from this message and calculate flags.
|
||||
rtc::ArrayView<const uint8_t> chunk_payload =
|
||||
item->message.payload().subview(item->remaining_offset, max_size);
|
||||
rtc::ArrayView<const uint8_t> message_payload = message.payload();
|
||||
Data::IsBeginning is_beginning(chunk_payload.data() ==
|
||||
message_payload.data());
|
||||
Data::IsEnd is_end((chunk_payload.data() + chunk_payload.size()) ==
|
||||
(message_payload.data() + message_payload.size()));
|
||||
|
||||
StreamID stream_id = message.stream_id();
|
||||
PPID ppid = message.ppid();
|
||||
|
||||
// Zero-copy the payload if the message fits in a single chunk.
|
||||
std::vector<uint8_t> payload =
|
||||
is_beginning && is_end
|
||||
? std::move(message).ReleasePayload()
|
||||
: std::vector<uint8_t>(chunk_payload.begin(), chunk_payload.end());
|
||||
|
||||
FSN fsn(item->current_fsn);
|
||||
item->current_fsn = FSN(*item->current_fsn + 1);
|
||||
|
||||
SendQueue::DataToSend chunk(Data(stream_id, item->ssn.value_or(SSN(0)),
|
||||
item->message_id.value(), fsn, ppid,
|
||||
std::move(payload), is_beginning, is_end,
|
||||
item->send_options.unordered));
|
||||
chunk.max_retransmissions = item->send_options.max_retransmissions;
|
||||
chunk.expires_at = item->expires_at;
|
||||
|
||||
if (is_end) {
|
||||
// The entire message has been sent, and its last data copied to `chunk`, so
|
||||
// it can safely be discarded.
|
||||
items_.pop_front();
|
||||
} else {
|
||||
item->remaining_offset += chunk_payload.size();
|
||||
item->remaining_size -= chunk_payload.size();
|
||||
RTC_DCHECK(item->remaining_offset + item->remaining_size ==
|
||||
item->message.payload().size());
|
||||
RTC_DCHECK(item->remaining_size > 0);
|
||||
}
|
||||
RTC_DLOG(LS_VERBOSE) << log_prefix_ << "tx-msg: Producing chunk of "
|
||||
<< chunk.data.size() << " bytes (max: " << max_size
|
||||
<< ")";
|
||||
return chunk;
|
||||
}
|
||||
|
||||
void FCFSSendQueue::Discard(IsUnordered unordered,
|
||||
StreamID stream_id,
|
||||
MID message_id) {
|
||||
// As this method will only discard partially sent messages, and as the queue
|
||||
// is a FIFO queue, the only partially sent message would be the topmost
|
||||
// message.
|
||||
if (!items_.empty()) {
|
||||
Item& item = items_.front();
|
||||
if (item.send_options.unordered == unordered &&
|
||||
item.message.stream_id() == stream_id && item.message_id.has_value() &&
|
||||
*item.message_id == message_id) {
|
||||
items_.pop_front();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void FCFSSendQueue::PrepareResetStreams(
|
||||
rtc::ArrayView<const StreamID> streams) {
|
||||
for (StreamID stream_id : streams) {
|
||||
paused_streams_.insert(stream_id);
|
||||
}
|
||||
|
||||
// Will not discard partially sent messages - only whole messages. Partially
|
||||
// delivered messages (at the time of receiving a Stream Reset command) will
|
||||
// always deliver all the fragments before actually resetting the stream.
|
||||
for (auto it = items_.begin(); it != items_.end();) {
|
||||
if (IsPaused(it->message.stream_id()) && it->remaining_offset == 0) {
|
||||
it = items_.erase(it);
|
||||
} else {
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool FCFSSendQueue::CanResetStreams() const {
|
||||
for (auto& item : items_) {
|
||||
if (IsPaused(item.message.stream_id())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void FCFSSendQueue::CommitResetStreams() {
|
||||
for (StreamID stream_id : paused_streams_) {
|
||||
ssn_by_stream_id_[stream_id] = SSN(0);
|
||||
// https://tools.ietf.org/html/rfc8260#section-2.3.2
|
||||
// "When an association resets the SSN using the SCTP extension defined
|
||||
// in [RFC6525], the two counters (one for the ordered messages, one for
|
||||
// the unordered messages) used for the MIDs MUST be reset to 0."
|
||||
mid_by_stream_id_[{IsUnordered(false), stream_id}] = MID(0);
|
||||
mid_by_stream_id_[{IsUnordered(true), stream_id}] = MID(0);
|
||||
}
|
||||
RollbackResetStreams();
|
||||
}
|
||||
|
||||
void FCFSSendQueue::RollbackResetStreams() {
|
||||
while (!paused_items_.empty()) {
|
||||
items_.push_back(std::move(paused_items_.front()));
|
||||
paused_items_.pop_front();
|
||||
}
|
||||
paused_streams_.clear();
|
||||
}
|
||||
|
||||
void FCFSSendQueue::Reset() {
|
||||
if (!items_.empty()) {
|
||||
// If this message has been partially sent, reset it so that it will be
|
||||
// re-sent.
|
||||
auto& item = items_.front();
|
||||
item.remaining_offset = 0;
|
||||
item.remaining_size = item.message.payload().size();
|
||||
item.message_id = absl::nullopt;
|
||||
item.ssn = absl::nullopt;
|
||||
item.current_fsn = FSN(0);
|
||||
}
|
||||
RollbackResetStreams();
|
||||
mid_by_stream_id_.clear();
|
||||
ssn_by_stream_id_.clear();
|
||||
}
|
||||
|
||||
bool FCFSSendQueue::IsPaused(StreamID stream_id) const {
|
||||
return paused_streams_.find(stream_id) != paused_streams_.end();
|
||||
}
|
||||
|
||||
} // namespace dcsctp
|
||||
123
net/dcsctp/tx/fcfs_send_queue.h
Normal file
123
net/dcsctp/tx/fcfs_send_queue.h
Normal file
@ -0,0 +1,123 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#ifndef NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
|
||||
#define NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
|
||||
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/algorithm/container.h"
|
||||
#include "absl/strings/string_view.h"
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/common/pair_hash.h"
|
||||
#include "net/dcsctp/public/dcsctp_message.h"
|
||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
// The FCFSSendQueue (First-Come, First-Served Send Queue) holds all messages
|
||||
// that the client wants to send, but that haven't yet been split into chunks
|
||||
// and sent on the wire.
|
||||
//
|
||||
// First-Come, First Served means that it passes the data in the exact same
|
||||
// order as they were delivered by the calling application, and is defined in
|
||||
// https://tools.ietf.org/html/rfc8260#section-3.1. It's a FIFO queue, but that
|
||||
// term isn't used in this RFC.
|
||||
//
|
||||
// As messages can be (requested to be) sent before
|
||||
// the connection is properly established, this send queue is always present -
|
||||
// even for closed connections.
|
||||
class FCFSSendQueue : public SendQueue {
|
||||
public:
|
||||
// How small a data chunk's payload may be, if having to fragment a message.
|
||||
static constexpr size_t kMinimumFragmentedPayload = 10;
|
||||
|
||||
FCFSSendQueue(absl::string_view log_prefix, size_t buffer_size)
|
||||
: log_prefix_(std::string(log_prefix) + "fcfs: "),
|
||||
buffer_size_(buffer_size) {}
|
||||
|
||||
// Indicates if the buffer is full. Note that it's up to the caller to ensure
|
||||
// that the buffer is not full prior to adding new items to it.
|
||||
bool IsFull() const;
|
||||
// Indicates if the buffer is empty.
|
||||
bool IsEmpty() const;
|
||||
|
||||
// Adds the message to be sent using the `send_options` provided. The current
|
||||
// time should be in `now`. Note that it's the responsibility of the caller to
|
||||
// ensure that the buffer is not full (by calling `IsFull`) before adding
|
||||
// messages to it.
|
||||
void Add(TimeMs now,
|
||||
DcSctpMessage message,
|
||||
const SendOptions& send_options = {});
|
||||
|
||||
// Implementation of `SendQueue`.
|
||||
absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) override;
|
||||
void Discard(IsUnordered unordered,
|
||||
StreamID stream_id,
|
||||
MID message_id) override;
|
||||
void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) override;
|
||||
bool CanResetStreams() const override;
|
||||
void CommitResetStreams() override;
|
||||
void RollbackResetStreams() override;
|
||||
void Reset() override;
|
||||
|
||||
// The size of the buffer, in "payload bytes".
|
||||
size_t total_bytes() const;
|
||||
|
||||
private:
|
||||
// An enqueued message and metadata.
|
||||
struct Item {
|
||||
explicit Item(DcSctpMessage msg,
|
||||
absl::optional<TimeMs> expires_at,
|
||||
const SendOptions& send_options)
|
||||
: message(std::move(msg)),
|
||||
expires_at(expires_at),
|
||||
send_options(send_options),
|
||||
remaining_offset(0),
|
||||
remaining_size(message.payload().size()) {}
|
||||
DcSctpMessage message;
|
||||
absl::optional<TimeMs> expires_at;
|
||||
SendOptions send_options;
|
||||
// The remaining payload (offset and size) to be sent, when it has been
|
||||
// fragmented.
|
||||
size_t remaining_offset;
|
||||
size_t remaining_size;
|
||||
// If set, an allocated Message ID and SSN. Will be allocated when the first
|
||||
// fragment is sent.
|
||||
absl::optional<MID> message_id = absl::nullopt;
|
||||
absl::optional<SSN> ssn = absl::nullopt;
|
||||
// The current Fragment Sequence Number, incremented for each fragment.
|
||||
FSN current_fsn = FSN(0);
|
||||
};
|
||||
|
||||
Item* GetFirstNonExpiredMessage(TimeMs now);
|
||||
bool IsPaused(StreamID stream_id) const;
|
||||
|
||||
const std::string log_prefix_;
|
||||
const size_t buffer_size_;
|
||||
std::deque<Item> items_;
|
||||
|
||||
std::unordered_set<StreamID, StreamID::Hasher> paused_streams_;
|
||||
std::deque<Item> paused_items_;
|
||||
|
||||
std::unordered_map<std::pair<IsUnordered, StreamID>, MID, UnorderedStreamHash>
|
||||
mid_by_stream_id_;
|
||||
std::unordered_map<StreamID, SSN, StreamID::Hasher> ssn_by_stream_id_;
|
||||
};
|
||||
} // namespace dcsctp
|
||||
|
||||
#endif // NET_DCSCTP_TX_FCFS_SEND_QUEUE_H_
|
||||
361
net/dcsctp/tx/fcfs_send_queue_test.cc
Normal file
361
net/dcsctp/tx/fcfs_send_queue_test.cc
Normal file
@ -0,0 +1,361 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#include "net/dcsctp/tx/fcfs_send_queue.h"
|
||||
|
||||
#include <cstdint>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/dcsctp_message.h"
|
||||
#include "net/dcsctp/public/dcsctp_options.h"
|
||||
#include "net/dcsctp/public/dcsctp_socket.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "rtc_base/gunit.h"
|
||||
#include "test/gmock.h"
|
||||
|
||||
namespace dcsctp {
|
||||
namespace {
|
||||
|
||||
constexpr TimeMs kNow = TimeMs(0);
|
||||
constexpr StreamID kStreamID(1);
|
||||
constexpr PPID kPPID(53);
|
||||
|
||||
class FCFSSendQueueTest : public testing::Test {
|
||||
protected:
|
||||
FCFSSendQueueTest() : buf_("log: ", 100) {}
|
||||
|
||||
const DcSctpOptions options_;
|
||||
FCFSSendQueue buf_;
|
||||
};
|
||||
|
||||
TEST_F(FCFSSendQueueTest, EmptyBuffer) {
|
||||
EXPECT_TRUE(buf_.IsEmpty());
|
||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, AddAndGetSingleChunk) {
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 4, 5, 6}));
|
||||
|
||||
EXPECT_FALSE(buf_.IsEmpty());
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
absl::optional<SendQueue::DataToSend> chunk_opt = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_opt.has_value());
|
||||
EXPECT_TRUE(chunk_opt->data.is_beginning);
|
||||
EXPECT_TRUE(chunk_opt->data.is_end);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, CarveOutBeginningMiddleAndEnd) {
|
||||
std::vector<uint8_t> payload(60);
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_beg =
|
||||
buf_.Produce(kNow, /*max_size=*/20);
|
||||
ASSERT_TRUE(chunk_beg.has_value());
|
||||
EXPECT_TRUE(chunk_beg->data.is_beginning);
|
||||
EXPECT_FALSE(chunk_beg->data.is_end);
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_mid =
|
||||
buf_.Produce(kNow, /*max_size=*/20);
|
||||
ASSERT_TRUE(chunk_mid.has_value());
|
||||
EXPECT_FALSE(chunk_mid->data.is_beginning);
|
||||
EXPECT_FALSE(chunk_mid->data.is_end);
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_end =
|
||||
buf_.Produce(kNow, /*max_size=*/20);
|
||||
ASSERT_TRUE(chunk_end.has_value());
|
||||
EXPECT_FALSE(chunk_end->data.is_beginning);
|
||||
EXPECT_TRUE(chunk_end->data.is_end);
|
||||
|
||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, GetChunksFromTwoMessages) {
|
||||
std::vector<uint8_t> payload(60);
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||
EXPECT_TRUE(chunk_one->data.is_beginning);
|
||||
EXPECT_TRUE(chunk_one->data.is_end);
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
||||
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
||||
EXPECT_TRUE(chunk_two->data.is_beginning);
|
||||
EXPECT_TRUE(chunk_two->data.is_end);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, BufferBecomesFullAndEmptied) {
|
||||
std::vector<uint8_t> payload(60);
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(3), PPID(54), payload));
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
// However, it's still possible to add messages. It's a soft limit, and it
|
||||
// might be necessary to forcefully add messages due to e.g. external
|
||||
// fragmentation.
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(5), PPID(55), payload));
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||
|
||||
EXPECT_TRUE(buf_.IsFull());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(3));
|
||||
EXPECT_EQ(chunk_two->data.ppid, PPID(54));
|
||||
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
EXPECT_FALSE(buf_.IsEmpty());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_three.has_value());
|
||||
EXPECT_EQ(chunk_three->data.stream_id, StreamID(5));
|
||||
EXPECT_EQ(chunk_three->data.ppid, PPID(55));
|
||||
|
||||
EXPECT_FALSE(buf_.IsFull());
|
||||
EXPECT_TRUE(buf_.IsEmpty());
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, WillNotSendTooSmallPacket) {
|
||||
std::vector<uint8_t> payload(FCFSSendQueue::kMinimumFragmentedPayload + 1);
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
// Wouldn't fit enough payload (wouldn't want to fragment)
|
||||
EXPECT_FALSE(
|
||||
buf_.Produce(kNow,
|
||||
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload - 1)
|
||||
.has_value());
|
||||
|
||||
// Minimum fragment
|
||||
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||
buf_.Produce(kNow,
|
||||
/*max_size=*/FCFSSendQueue::kMinimumFragmentedPayload);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(chunk_one->data.ppid, kPPID);
|
||||
|
||||
// There is only one byte remaining - it can be fetched as it doesn't require
|
||||
// additional fragmentation.
|
||||
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||
buf_.Produce(kNow, /*max_size=*/1);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(chunk_two->data.ppid, kPPID);
|
||||
|
||||
EXPECT_TRUE(buf_.IsEmpty());
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, DefaultsToOrderedSend) {
|
||||
std::vector<uint8_t> payload(20);
|
||||
|
||||
// Default is ordered
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
absl::optional<SendQueue::DataToSend> chunk_one =
|
||||
buf_.Produce(kNow, /*max_size=*/100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_FALSE(chunk_one->data.is_unordered);
|
||||
|
||||
// Explicitly unordered.
|
||||
SendOptions opts;
|
||||
opts.unordered = IsUnordered(true);
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload), opts);
|
||||
absl::optional<SendQueue::DataToSend> chunk_two =
|
||||
buf_.Produce(kNow, /*max_size=*/100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_TRUE(chunk_two->data.is_unordered);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, ProduceWithLifetimeExpiry) {
|
||||
std::vector<uint8_t> payload(20);
|
||||
|
||||
// Default is no expiry
|
||||
TimeMs now = kNow;
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
now = now + DurationMs(1000000);
|
||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
||||
|
||||
SendOptions expires_2_seconds;
|
||||
expires_2_seconds.lifetime = DurationMs(2000);
|
||||
|
||||
// Add and consume within lifetime
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||
now = now + DurationMs(1999);
|
||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
||||
|
||||
// Add and consume just outside lifetime
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||
now = now + DurationMs(2000);
|
||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
||||
|
||||
// A long time after expiry
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||
now = now + DurationMs(1000000);
|
||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
||||
|
||||
// Expire one message, but produce the second that is not expired.
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_2_seconds);
|
||||
|
||||
SendOptions expires_4_seconds;
|
||||
expires_4_seconds.lifetime = DurationMs(4000);
|
||||
|
||||
buf_.Add(now, DcSctpMessage(kStreamID, kPPID, payload), expires_4_seconds);
|
||||
now = now + DurationMs(2000);
|
||||
|
||||
ASSERT_TRUE(buf_.Produce(now, 100));
|
||||
ASSERT_FALSE(buf_.Produce(now, 100));
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, DiscardPartialPackets) {
|
||||
std::vector<uint8_t> payload(120);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_FALSE(chunk_one->data.is_end);
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
||||
chunk_one->data.message_id);
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_FALSE(chunk_two->data.is_end);
|
||||
EXPECT_EQ(chunk_two->data.stream_id, StreamID(2));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_three.has_value());
|
||||
EXPECT_TRUE(chunk_three->data.is_end);
|
||||
EXPECT_EQ(chunk_three->data.stream_id, StreamID(2));
|
||||
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
||||
|
||||
// Calling it again shouldn't cause issues.
|
||||
buf_.Discard(IsUnordered(false), chunk_one->data.stream_id,
|
||||
chunk_one->data.message_id);
|
||||
ASSERT_FALSE(buf_.Produce(kNow, 100));
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, PrepareResetStreamsDiscardsStream) {
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, {1, 2, 3}));
|
||||
buf_.Add(kNow, DcSctpMessage(StreamID(2), PPID(54), {1, 2, 3, 4, 5}));
|
||||
EXPECT_EQ(buf_.total_bytes(), 8u);
|
||||
|
||||
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
||||
EXPECT_EQ(buf_.total_bytes(), 5u);
|
||||
buf_.CommitResetStreams();
|
||||
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(2)}));
|
||||
EXPECT_EQ(buf_.total_bytes(), 0u);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, PrepareResetStreamsNotPartialPackets) {
|
||||
std::vector<uint8_t> payload(120);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(buf_.total_bytes(), 2 * payload.size() - 50);
|
||||
|
||||
StreamID stream_ids[] = {StreamID(1)};
|
||||
buf_.PrepareResetStreams(stream_ids);
|
||||
EXPECT_EQ(buf_.total_bytes(), payload.size() - 50);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, EnqueuedItemsArePausedDuringStreamReset) {
|
||||
std::vector<uint8_t> payload(50);
|
||||
|
||||
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
||||
EXPECT_EQ(buf_.total_bytes(), 0u);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
||||
|
||||
EXPECT_FALSE(buf_.Produce(kNow, 100).has_value());
|
||||
buf_.CommitResetStreams();
|
||||
EXPECT_EQ(buf_.total_bytes(), payload.size());
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 50);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.stream_id, kStreamID);
|
||||
EXPECT_EQ(buf_.total_bytes(), 0u);
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, CommittingResetsSSN) {
|
||||
std::vector<uint8_t> payload(50);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
||||
|
||||
StreamID stream_ids[] = {StreamID(1)};
|
||||
buf_.PrepareResetStreams(stream_ids);
|
||||
|
||||
// Buffered
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
EXPECT_TRUE(buf_.CanResetStreams());
|
||||
buf_.CommitResetStreams();
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_three.has_value());
|
||||
EXPECT_EQ(chunk_three->data.ssn, SSN(0));
|
||||
}
|
||||
|
||||
TEST_F(FCFSSendQueueTest, RollBackResumesSSN) {
|
||||
std::vector<uint8_t> payload(50);
|
||||
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_one = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_one.has_value());
|
||||
EXPECT_EQ(chunk_one->data.ssn, SSN(0));
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_two = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_two.has_value());
|
||||
EXPECT_EQ(chunk_two->data.ssn, SSN(1));
|
||||
|
||||
buf_.PrepareResetStreams(std::vector<StreamID>({StreamID(1)}));
|
||||
|
||||
// Buffered
|
||||
buf_.Add(kNow, DcSctpMessage(kStreamID, kPPID, payload));
|
||||
|
||||
EXPECT_TRUE(buf_.CanResetStreams());
|
||||
buf_.RollbackResetStreams();
|
||||
|
||||
absl::optional<SendQueue::DataToSend> chunk_three = buf_.Produce(kNow, 100);
|
||||
ASSERT_TRUE(chunk_three.has_value());
|
||||
EXPECT_EQ(chunk_three->data.ssn, SSN(2));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
} // namespace dcsctp
|
||||
50
net/dcsctp/tx/mock_send_queue.h
Normal file
50
net/dcsctp/tx/mock_send_queue.h
Normal file
@ -0,0 +1,50 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#ifndef NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
|
||||
#define NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
|
||||
|
||||
#include <cstdint>
|
||||
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/tx/send_queue.h"
|
||||
#include "test/gmock.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
class MockSendQueue : public SendQueue {
|
||||
public:
|
||||
MockSendQueue() {
|
||||
ON_CALL(*this, Produce).WillByDefault([](TimeMs now, size_t max_size) {
|
||||
return absl::nullopt;
|
||||
});
|
||||
}
|
||||
|
||||
MOCK_METHOD(absl::optional<SendQueue::DataToSend>,
|
||||
Produce,
|
||||
(TimeMs now, size_t max_size),
|
||||
(override));
|
||||
MOCK_METHOD(void,
|
||||
Discard,
|
||||
(IsUnordered unordered, StreamID stream_id, MID message_id),
|
||||
(override));
|
||||
MOCK_METHOD(void,
|
||||
PrepareResetStreams,
|
||||
(rtc::ArrayView<const StreamID> streams),
|
||||
(override));
|
||||
MOCK_METHOD(bool, CanResetStreams, (), (const, override));
|
||||
MOCK_METHOD(void, CommitResetStreams, (), (override));
|
||||
MOCK_METHOD(void, RollbackResetStreams, (), (override));
|
||||
MOCK_METHOD(void, Reset, (), (override));
|
||||
};
|
||||
|
||||
} // namespace dcsctp
|
||||
|
||||
#endif // NET_DCSCTP_TX_MOCK_SEND_QUEUE_H_
|
||||
111
net/dcsctp/tx/send_queue.h
Normal file
111
net/dcsctp/tx/send_queue.h
Normal file
@ -0,0 +1,111 @@
|
||||
/*
|
||||
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||
*
|
||||
* Use of this source code is governed by a BSD-style license
|
||||
* that can be found in the LICENSE file in the root of the source
|
||||
* tree. An additional intellectual property rights grant can be found
|
||||
* in the file PATENTS. All contributing project authors may
|
||||
* be found in the AUTHORS file in the root of the source tree.
|
||||
*/
|
||||
#ifndef NET_DCSCTP_TX_SEND_QUEUE_H_
|
||||
#define NET_DCSCTP_TX_SEND_QUEUE_H_
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
#include "absl/types/optional.h"
|
||||
#include "api/array_view.h"
|
||||
#include "net/dcsctp/common/internal_types.h"
|
||||
#include "net/dcsctp/packet/data.h"
|
||||
#include "net/dcsctp/public/types.h"
|
||||
|
||||
namespace dcsctp {
|
||||
|
||||
class SendQueue {
|
||||
public:
|
||||
// Container for a data chunk that is produced by the SendQueue
|
||||
struct DataToSend {
|
||||
explicit DataToSend(Data data) : data(std::move(data)) {}
|
||||
// The data to send, including all parameters.
|
||||
Data data;
|
||||
|
||||
// Partial reliability - RFC3758
|
||||
absl::optional<int> max_retransmissions;
|
||||
absl::optional<TimeMs> expires_at;
|
||||
};
|
||||
|
||||
virtual ~SendQueue() = default;
|
||||
|
||||
// TODO(boivie): This interface is obviously missing an "Add" function, but
|
||||
// that is postponed a bit until the story around how to model message
|
||||
// prioritization, which is important for any advanced stream scheduler, is
|
||||
// further clarified.
|
||||
|
||||
// Produce a chunk to be sent.
|
||||
//
|
||||
// `max_size` refers to how many payload bytes that may be produced, not
|
||||
// including any headers.
|
||||
virtual absl::optional<DataToSend> Produce(TimeMs now, size_t max_size) = 0;
|
||||
|
||||
// Discards a partially sent message identified by the parameters `unordered`,
|
||||
// `stream_id` and `message_id`. The `message_id` comes from the returned
|
||||
// information when having called `Produce`. A partially sent message means
|
||||
// that it has had at least one fragment of it returned when `Produce` was
|
||||
// called prior to calling this method).
|
||||
//
|
||||
// This is used when a message has been found to be expired (by the partial
|
||||
// reliability extension), and the retransmission queue will signal the
|
||||
// receiver that any partially received message fragments should be skipped.
|
||||
// This means that any remaining fragments in the Send Queue must be removed
|
||||
// as well so that they are not sent.
|
||||
virtual void Discard(IsUnordered unordered,
|
||||
StreamID stream_id,
|
||||
MID message_id) = 0;
|
||||
|
||||
// Prepares the streams to be reset. This is used to close a WebRTC data
|
||||
// channel and will be signaled to the other side.
|
||||
//
|
||||
// Concretely, it discards all whole (not partly sent) messages in the given
|
||||
// streams and pauses those streams so that future added messages aren't
|
||||
// produced until `ResumeStreams` is called.
|
||||
//
|
||||
// TODO(boivie): Investigate if it really should discard any message at all.
|
||||
// RFC8831 only mentions that "[RFC6525] also guarantees that all the messages
|
||||
// are delivered (or abandoned) before the stream is reset."
|
||||
//
|
||||
// This method can be called multiple times to add more streams to be
|
||||
// reset, and paused while they are resetting. This is the first part of the
|
||||
// two-phase commit protocol to reset streams, where the caller completes the
|
||||
// procedure by either calling `CommitResetStreams` or `RollbackResetStreams`.
|
||||
virtual void PrepareResetStreams(rtc::ArrayView<const StreamID> streams) = 0;
|
||||
|
||||
// Returns true if all non-discarded messages during `PrepareResetStreams`
|
||||
// (which are those that was partially sent before that method was called)
|
||||
// have been sent.
|
||||
virtual bool CanResetStreams() const = 0;
|
||||
|
||||
// Called to commit to reset the streams provided to `PrepareResetStreams`.
|
||||
// It will reset the stream sequence numbers (SSNs) and message identifiers
|
||||
// (MIDs) and resume the paused streams.
|
||||
virtual void CommitResetStreams() = 0;
|
||||
|
||||
// Called to abort the resetting of streams provided to `PrepareResetStreams`.
|
||||
// Will resume the paused streams without resetting the stream sequence
|
||||
// numbers (SSNs) or message identifiers (MIDs). Note that the non-partial
|
||||
// messages that were discarded when calling `PrepareResetStreams` will not be
|
||||
// recovered, to better match the intention from the sender to "close the
|
||||
// channel".
|
||||
virtual void RollbackResetStreams() = 0;
|
||||
|
||||
// Resets all message identifier counters (MID, SSN) and makes all partially
|
||||
// messages be ready to be re-sent in full. This is used when the peer has
|
||||
// been detected to have restarted and is used to try to minimize the amount
|
||||
// of data loss. However, data loss cannot be completely guaranteed when a
|
||||
// peer restarts.
|
||||
virtual void Reset() = 0;
|
||||
};
|
||||
} // namespace dcsctp
|
||||
|
||||
#endif // NET_DCSCTP_TX_SEND_QUEUE_H_
|
||||
Reference in New Issue
Block a user