dcsctp: Add interleaved reassembly streams
This is the receive-side part of supporting what is frequently called "ndata", but actually RFC8260 - "User Message Interleaving". This CL adds a new ReassemblyStreams implementation that can assemble I-DATA chunks and process I-FORWARD-TSN for partial reliability. Bug: webrtc:5696 Change-Id: I3cfbea62e7b6c02fbd3f51b43ba3fb7863cf0f88 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/218506 Commit-Queue: Victor Boivie <boivie@webrtc.org> Reviewed-by: Harald Alvestrand <hta@webrtc.org> Cr-Commit-Position: refs/heads/main@{#37128}
This commit is contained in:

committed by
WebRTC LUCI CQ

parent
5f5bdf1880
commit
2a9bed3ee3
@ -44,6 +44,28 @@ rtc_source_set("reassembly_streams") {
|
|||||||
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
|
absl_deps = [ "//third_party/abseil-cpp/absl/strings" ]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rtc_library("interleaved_reassembly_streams") {
|
||||||
|
deps = [
|
||||||
|
":reassembly_streams",
|
||||||
|
"../../../api:array_view",
|
||||||
|
"../../../rtc_base",
|
||||||
|
"../../../rtc_base:checks",
|
||||||
|
"../../../rtc_base:logging",
|
||||||
|
"../common:sequence_numbers",
|
||||||
|
"../packet:chunk",
|
||||||
|
"../packet:data",
|
||||||
|
"../public:types",
|
||||||
|
]
|
||||||
|
sources = [
|
||||||
|
"interleaved_reassembly_streams.cc",
|
||||||
|
"interleaved_reassembly_streams.h",
|
||||||
|
]
|
||||||
|
absl_deps = [
|
||||||
|
"//third_party/abseil-cpp/absl/algorithm:container",
|
||||||
|
"//third_party/abseil-cpp/absl/strings",
|
||||||
|
"//third_party/abseil-cpp/absl/types:optional",
|
||||||
|
]
|
||||||
|
}
|
||||||
rtc_library("traditional_reassembly_streams") {
|
rtc_library("traditional_reassembly_streams") {
|
||||||
deps = [
|
deps = [
|
||||||
":reassembly_streams",
|
":reassembly_streams",
|
||||||
@ -68,6 +90,7 @@ rtc_library("traditional_reassembly_streams") {
|
|||||||
|
|
||||||
rtc_library("reassembly_queue") {
|
rtc_library("reassembly_queue") {
|
||||||
deps = [
|
deps = [
|
||||||
|
":interleaved_reassembly_streams",
|
||||||
":reassembly_streams",
|
":reassembly_streams",
|
||||||
":traditional_reassembly_streams",
|
":traditional_reassembly_streams",
|
||||||
"../../../api:array_view",
|
"../../../api:array_view",
|
||||||
@ -98,6 +121,7 @@ if (rtc_include_tests) {
|
|||||||
|
|
||||||
deps = [
|
deps = [
|
||||||
":data_tracker",
|
":data_tracker",
|
||||||
|
":interleaved_reassembly_streams",
|
||||||
":reassembly_queue",
|
":reassembly_queue",
|
||||||
":reassembly_streams",
|
":reassembly_streams",
|
||||||
":traditional_reassembly_streams",
|
":traditional_reassembly_streams",
|
||||||
@ -117,6 +141,7 @@ if (rtc_include_tests) {
|
|||||||
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
absl_deps = [ "//third_party/abseil-cpp/absl/types:optional" ]
|
||||||
sources = [
|
sources = [
|
||||||
"data_tracker_test.cc",
|
"data_tracker_test.cc",
|
||||||
|
"interleaved_reassembly_streams_test.cc",
|
||||||
"reassembly_queue_test.cc",
|
"reassembly_queue_test.cc",
|
||||||
"traditional_reassembly_streams_test.cc",
|
"traditional_reassembly_streams_test.cc",
|
||||||
]
|
]
|
||||||
|
270
net/dcsctp/rx/interleaved_reassembly_streams.cc
Normal file
270
net/dcsctp/rx/interleaved_reassembly_streams.cc
Normal file
@ -0,0 +1,270 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
|
||||||
|
|
||||||
|
#include <stddef.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <functional>
|
||||||
|
#include <iterator>
|
||||||
|
#include <map>
|
||||||
|
#include <numeric>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <utility>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
#include "absl/algorithm/container.h"
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "net/dcsctp/common/sequence_numbers.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/public/types.h"
|
||||||
|
#include "rtc_base/logging.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams::InterleavedReassemblyStreams(
|
||||||
|
absl::string_view log_prefix,
|
||||||
|
OnAssembledMessage on_assembled_message,
|
||||||
|
const DcSctpSocketHandoverState* handover_state)
|
||||||
|
: log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {
|
||||||
|
if (handover_state) {
|
||||||
|
for (const DcSctpSocketHandoverState::OrderedStream& state :
|
||||||
|
handover_state->rx.ordered_streams) {
|
||||||
|
FullStreamId stream_id(IsUnordered(false), StreamID(state.id));
|
||||||
|
streams_.emplace(
|
||||||
|
std::piecewise_construct, std::forward_as_tuple(stream_id),
|
||||||
|
std::forward_as_tuple(stream_id, this, MID(state.next_ssn)));
|
||||||
|
}
|
||||||
|
for (const DcSctpSocketHandoverState::UnorderedStream& state :
|
||||||
|
handover_state->rx.unordered_streams) {
|
||||||
|
FullStreamId stream_id(IsUnordered(true), StreamID(state.id));
|
||||||
|
streams_.emplace(std::piecewise_construct,
|
||||||
|
std::forward_as_tuple(stream_id),
|
||||||
|
std::forward_as_tuple(stream_id, this));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessage(
|
||||||
|
UnwrappedMID mid) {
|
||||||
|
std::map<UnwrappedMID, ChunkMap>::const_iterator it =
|
||||||
|
chunks_by_mid_.find(mid);
|
||||||
|
if (it == chunks_by_mid_.end()) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
||||||
|
<< *mid.Wrap() << " - no chunks";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
const ChunkMap& chunks = it->second;
|
||||||
|
if (!chunks.begin()->second.second.is_beginning ||
|
||||||
|
!chunks.rbegin()->second.second.is_end) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
||||||
|
<< *mid.Wrap() << "- missing beginning or end";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
int64_t fsn_diff = *chunks.rbegin()->first - *chunks.begin()->first;
|
||||||
|
if (fsn_diff != (static_cast<int64_t>(chunks.size()) - 1)) {
|
||||||
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
||||||
|
<< *mid.Wrap() << "- not all chunks exist (have "
|
||||||
|
<< chunks.size() << ", expect " << (fsn_diff + 1)
|
||||||
|
<< ")";
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t removed_bytes = AssembleMessage(chunks);
|
||||||
|
RTC_DLOG(LS_VERBOSE) << parent_.log_prefix_ << "TryToAssembleMessage "
|
||||||
|
<< *mid.Wrap() << " - succeeded and removed "
|
||||||
|
<< removed_bytes;
|
||||||
|
|
||||||
|
chunks_by_mid_.erase(mid);
|
||||||
|
return removed_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t InterleavedReassemblyStreams::Stream::AssembleMessage(
|
||||||
|
const ChunkMap& tsn_chunks) {
|
||||||
|
size_t count = tsn_chunks.size();
|
||||||
|
if (count == 1) {
|
||||||
|
// Fast path - zero-copy
|
||||||
|
const Data& data = tsn_chunks.begin()->second.second;
|
||||||
|
size_t payload_size = data.size();
|
||||||
|
UnwrappedTSN tsns[1] = {tsn_chunks.begin()->second.first};
|
||||||
|
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
|
||||||
|
parent_.on_assembled_message_(tsns, std::move(message));
|
||||||
|
return payload_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - will need to concatenate the payload.
|
||||||
|
std::vector<UnwrappedTSN> tsns;
|
||||||
|
tsns.reserve(count);
|
||||||
|
|
||||||
|
std::vector<uint8_t> payload;
|
||||||
|
size_t payload_size = absl::c_accumulate(
|
||||||
|
tsn_chunks, 0,
|
||||||
|
[](size_t v, const auto& p) { return v + p.second.second.size(); });
|
||||||
|
payload.reserve(payload_size);
|
||||||
|
|
||||||
|
for (auto& item : tsn_chunks) {
|
||||||
|
const UnwrappedTSN tsn = item.second.first;
|
||||||
|
const Data& data = item.second.second;
|
||||||
|
tsns.push_back(tsn);
|
||||||
|
payload.insert(payload.end(), data.payload.begin(), data.payload.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
const Data& data = tsn_chunks.begin()->second.second;
|
||||||
|
|
||||||
|
DcSctpMessage message(data.stream_id, data.ppid, std::move(payload));
|
||||||
|
parent_.on_assembled_message_(tsns, std::move(message));
|
||||||
|
return payload_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t InterleavedReassemblyStreams::Stream::EraseTo(MID message_id) {
|
||||||
|
UnwrappedMID unwrapped_mid = mid_unwrapper_.Unwrap(message_id);
|
||||||
|
|
||||||
|
size_t removed_bytes = 0;
|
||||||
|
auto it = chunks_by_mid_.begin();
|
||||||
|
while (it != chunks_by_mid_.end() && it->first <= unwrapped_mid) {
|
||||||
|
removed_bytes += absl::c_accumulate(
|
||||||
|
it->second, 0,
|
||||||
|
[](size_t r2, const auto& q) { return r2 + q.second.second.size(); });
|
||||||
|
it = chunks_by_mid_.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!stream_id_.unordered) {
|
||||||
|
// For ordered streams, erasing a message might suddenly unblock that queue
|
||||||
|
// and allow it to deliver any following received messages.
|
||||||
|
if (unwrapped_mid >= next_mid_) {
|
||||||
|
next_mid_ = unwrapped_mid.next_value();
|
||||||
|
}
|
||||||
|
|
||||||
|
removed_bytes += TryToAssembleMessages();
|
||||||
|
}
|
||||||
|
|
||||||
|
return removed_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
int InterleavedReassemblyStreams::Stream::Add(UnwrappedTSN tsn, Data data) {
|
||||||
|
RTC_DCHECK_EQ(*data.is_unordered, *stream_id_.unordered);
|
||||||
|
RTC_DCHECK_EQ(*data.stream_id, *stream_id_.stream_id);
|
||||||
|
int queued_bytes = data.size();
|
||||||
|
UnwrappedMID mid = mid_unwrapper_.Unwrap(data.message_id);
|
||||||
|
FSN fsn = data.fsn;
|
||||||
|
auto [unused, inserted] =
|
||||||
|
chunks_by_mid_[mid].emplace(fsn, std::make_pair(tsn, std::move(data)));
|
||||||
|
if (!inserted) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (stream_id_.unordered) {
|
||||||
|
queued_bytes -= TryToAssembleMessage(mid);
|
||||||
|
} else {
|
||||||
|
if (mid == next_mid_) {
|
||||||
|
queued_bytes -= TryToAssembleMessages();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return queued_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t InterleavedReassemblyStreams::Stream::TryToAssembleMessages() {
|
||||||
|
size_t removed_bytes = 0;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
size_t removed_bytes_this_iter = TryToAssembleMessage(next_mid_);
|
||||||
|
if (removed_bytes_this_iter == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
removed_bytes += removed_bytes_this_iter;
|
||||||
|
next_mid_.Increment();
|
||||||
|
}
|
||||||
|
return removed_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InterleavedReassemblyStreams::Stream::AddHandoverState(
|
||||||
|
DcSctpSocketHandoverState& state) const {
|
||||||
|
if (stream_id_.unordered) {
|
||||||
|
DcSctpSocketHandoverState::UnorderedStream state_stream;
|
||||||
|
state_stream.id = stream_id_.stream_id.value();
|
||||||
|
state.rx.unordered_streams.push_back(std::move(state_stream));
|
||||||
|
} else {
|
||||||
|
DcSctpSocketHandoverState::OrderedStream state_stream;
|
||||||
|
state_stream.id = stream_id_.stream_id.value();
|
||||||
|
state_stream.next_ssn = next_mid_.Wrap().value();
|
||||||
|
state.rx.ordered_streams.push_back(std::move(state_stream));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams::Stream&
|
||||||
|
InterleavedReassemblyStreams::GetOrCreateStream(const FullStreamId& stream_id) {
|
||||||
|
auto it = streams_.find(stream_id);
|
||||||
|
if (it == streams_.end()) {
|
||||||
|
it =
|
||||||
|
streams_
|
||||||
|
.emplace(std::piecewise_construct, std::forward_as_tuple(stream_id),
|
||||||
|
std::forward_as_tuple(stream_id, this))
|
||||||
|
.first;
|
||||||
|
}
|
||||||
|
return it->second;
|
||||||
|
}
|
||||||
|
|
||||||
|
int InterleavedReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
|
||||||
|
return GetOrCreateStream(FullStreamId(data.is_unordered, data.stream_id))
|
||||||
|
.Add(tsn, std::move(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t InterleavedReassemblyStreams::HandleForwardTsn(
|
||||||
|
UnwrappedTSN new_cumulative_ack_tsn,
|
||||||
|
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
|
||||||
|
size_t removed_bytes = 0;
|
||||||
|
for (const auto& skipped : skipped_streams) {
|
||||||
|
removed_bytes +=
|
||||||
|
GetOrCreateStream(FullStreamId(skipped.unordered, skipped.stream_id))
|
||||||
|
.EraseTo(skipped.message_id);
|
||||||
|
}
|
||||||
|
return removed_bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InterleavedReassemblyStreams::ResetStreams(
|
||||||
|
rtc::ArrayView<const StreamID> stream_ids) {
|
||||||
|
if (stream_ids.empty()) {
|
||||||
|
for (auto& entry : streams_) {
|
||||||
|
entry.second.Reset();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
for (StreamID stream_id : stream_ids) {
|
||||||
|
GetOrCreateStream(FullStreamId(IsUnordered(true), stream_id)).Reset();
|
||||||
|
GetOrCreateStream(FullStreamId(IsUnordered(false), stream_id)).Reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
HandoverReadinessStatus InterleavedReassemblyStreams::GetHandoverReadiness()
|
||||||
|
const {
|
||||||
|
HandoverReadinessStatus status;
|
||||||
|
for (const auto& [stream_id, stream] : streams_) {
|
||||||
|
if (stream.has_unassembled_chunks()) {
|
||||||
|
status.Add(
|
||||||
|
stream_id.unordered
|
||||||
|
? HandoverUnreadinessReason::kUnorderedStreamHasUnassembledChunks
|
||||||
|
: HandoverUnreadinessReason::kOrderedStreamHasUnassembledChunks);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
void InterleavedReassemblyStreams::AddHandoverState(
|
||||||
|
DcSctpSocketHandoverState& state) {
|
||||||
|
for (const auto& [unused, stream] : streams_) {
|
||||||
|
stream.AddHandoverState(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace dcsctp
|
111
net/dcsctp/rx/interleaved_reassembly_streams.h
Normal file
111
net/dcsctp/rx/interleaved_reassembly_streams.h
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#ifndef NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
|
||||||
|
#define NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <map>
|
||||||
|
#include <string>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "absl/strings/string_view.h"
|
||||||
|
#include "api/array_view.h"
|
||||||
|
#include "net/dcsctp/common/sequence_numbers.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
|
||||||
|
// Handles reassembly of incoming data when interleaved message sending is
|
||||||
|
// enabled on the association, i.e. when RFC8260 is in use.
|
||||||
|
class InterleavedReassemblyStreams : public ReassemblyStreams {
|
||||||
|
public:
|
||||||
|
InterleavedReassemblyStreams(
|
||||||
|
absl::string_view log_prefix,
|
||||||
|
OnAssembledMessage on_assembled_message,
|
||||||
|
const DcSctpSocketHandoverState* handover_state = nullptr);
|
||||||
|
|
||||||
|
int Add(UnwrappedTSN tsn, Data data) override;
|
||||||
|
|
||||||
|
size_t HandleForwardTsn(
|
||||||
|
UnwrappedTSN new_cumulative_ack_tsn,
|
||||||
|
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)
|
||||||
|
override;
|
||||||
|
|
||||||
|
void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
|
||||||
|
|
||||||
|
HandoverReadinessStatus GetHandoverReadiness() const override;
|
||||||
|
void AddHandoverState(DcSctpSocketHandoverState& state) override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct FullStreamId {
|
||||||
|
const IsUnordered unordered;
|
||||||
|
const StreamID stream_id;
|
||||||
|
|
||||||
|
FullStreamId(IsUnordered unordered, StreamID stream_id)
|
||||||
|
: unordered(unordered), stream_id(stream_id) {}
|
||||||
|
|
||||||
|
friend bool operator<(FullStreamId a, FullStreamId b) {
|
||||||
|
return a.unordered < b.unordered ||
|
||||||
|
(!(a.unordered < b.unordered) && (a.stream_id < b.stream_id));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
class Stream {
|
||||||
|
public:
|
||||||
|
Stream(FullStreamId stream_id,
|
||||||
|
InterleavedReassemblyStreams* parent,
|
||||||
|
MID next_mid = MID(0))
|
||||||
|
: stream_id_(stream_id),
|
||||||
|
parent_(*parent),
|
||||||
|
next_mid_(mid_unwrapper_.Unwrap(next_mid)) {}
|
||||||
|
int Add(UnwrappedTSN tsn, Data data);
|
||||||
|
size_t EraseTo(MID message_id);
|
||||||
|
void Reset() {
|
||||||
|
mid_unwrapper_.Reset();
|
||||||
|
next_mid_ = mid_unwrapper_.Unwrap(MID(0));
|
||||||
|
}
|
||||||
|
bool has_unassembled_chunks() const { return !chunks_by_mid_.empty(); }
|
||||||
|
void AddHandoverState(DcSctpSocketHandoverState& state) const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
using ChunkMap = std::map<FSN, std::pair<UnwrappedTSN, Data>>;
|
||||||
|
|
||||||
|
// Try to assemble one message identified by `mid`.
|
||||||
|
// Returns the number of bytes assembled if a message was assembled.
|
||||||
|
size_t TryToAssembleMessage(UnwrappedMID mid);
|
||||||
|
size_t AssembleMessage(const ChunkMap& tsn_chunks);
|
||||||
|
// Try to assemble one or several messages in order from the stream.
|
||||||
|
// Returns the number of bytes assembled if one or more messages were
|
||||||
|
// assembled.
|
||||||
|
size_t TryToAssembleMessages();
|
||||||
|
|
||||||
|
const FullStreamId stream_id_;
|
||||||
|
InterleavedReassemblyStreams& parent_;
|
||||||
|
std::map<UnwrappedMID, ChunkMap> chunks_by_mid_;
|
||||||
|
UnwrappedMID::Unwrapper mid_unwrapper_;
|
||||||
|
UnwrappedMID next_mid_;
|
||||||
|
};
|
||||||
|
|
||||||
|
Stream& GetOrCreateStream(const FullStreamId& stream_id);
|
||||||
|
|
||||||
|
const std::string log_prefix_;
|
||||||
|
|
||||||
|
// Callback for when a message has been assembled.
|
||||||
|
const OnAssembledMessage on_assembled_message_;
|
||||||
|
|
||||||
|
// All unordered and ordered streams, managing not-yet-assembled data.
|
||||||
|
std::map<FullStreamId, Stream> streams_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace dcsctp
|
||||||
|
|
||||||
|
#endif // NET_DCSCTP_RX_INTERLEAVED_REASSEMBLY_STREAMS_H_
|
154
net/dcsctp/rx/interleaved_reassembly_streams_test.cc
Normal file
154
net/dcsctp/rx/interleaved_reassembly_streams_test.cc
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
#include <memory>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
#include "net/dcsctp/common/sequence_numbers.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
|
||||||
|
#include "net/dcsctp/packet/chunk/iforward_tsn_chunk.h"
|
||||||
|
#include "net/dcsctp/packet/data.h"
|
||||||
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
|
#include "net/dcsctp/testing/data_generator.h"
|
||||||
|
#include "rtc_base/gunit.h"
|
||||||
|
#include "test/gmock.h"
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
namespace {
|
||||||
|
using ::testing::MockFunction;
|
||||||
|
using ::testing::NiceMock;
|
||||||
|
|
||||||
|
class InterleavedReassemblyStreamsTest : public testing::Test {
|
||||||
|
protected:
|
||||||
|
UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); }
|
||||||
|
|
||||||
|
InterleavedReassemblyStreamsTest() {}
|
||||||
|
DataGenerator gen_;
|
||||||
|
UnwrappedTSN::Unwrapper tsn_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
AddUnorderedMessageReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
|
||||||
|
// Adding the end fragment should make it empty again.
|
||||||
|
EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
AddSimpleOrderedMessageReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
AddMoreComplexOrderedMessageReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
|
||||||
|
Data late = gen_.Ordered({2, 3, 4});
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
DeleteUnorderedMessageReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
|
||||||
|
|
||||||
|
IForwardTsnChunk::SkippedStream skipped[] = {
|
||||||
|
IForwardTsnChunk::SkippedStream(IsUnordered(true), StreamID(1), MID(0))};
|
||||||
|
EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
DeleteSimpleOrderedMessageReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
|
||||||
|
|
||||||
|
IForwardTsnChunk::SkippedStream skipped[] = {
|
||||||
|
IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))};
|
||||||
|
EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
DeleteManyOrderedMessagesReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
|
||||||
|
gen_.Ordered({2, 3, 4});
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
|
||||||
|
|
||||||
|
// Expire all three messages
|
||||||
|
IForwardTsnChunk::SkippedStream skipped[] = {
|
||||||
|
IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(2))};
|
||||||
|
EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(InterleavedReassemblyStreamsTest,
|
||||||
|
DeleteOrderedMessageDelivesTwoReturnsCorrectSize) {
|
||||||
|
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
|
||||||
|
|
||||||
|
InterleavedReassemblyStreams streams("", on_assembled.AsStdFunction());
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
|
||||||
|
gen_.Ordered({2, 3, 4});
|
||||||
|
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
|
||||||
|
|
||||||
|
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
|
||||||
|
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
|
||||||
|
|
||||||
|
// The first ordered message expire, and the following two are delivered.
|
||||||
|
IForwardTsnChunk::SkippedStream skipped[] = {
|
||||||
|
IForwardTsnChunk::SkippedStream(IsUnordered(false), StreamID(1), MID(0))};
|
||||||
|
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
} // namespace dcsctp
|
@ -29,15 +29,32 @@
|
|||||||
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
|
#include "net/dcsctp/packet/parameter/outgoing_ssn_reset_request_parameter.h"
|
||||||
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
|
#include "net/dcsctp/packet/parameter/reconfiguration_response_parameter.h"
|
||||||
#include "net/dcsctp/public/dcsctp_message.h"
|
#include "net/dcsctp/public/dcsctp_message.h"
|
||||||
|
#include "net/dcsctp/rx/interleaved_reassembly_streams.h"
|
||||||
#include "net/dcsctp/rx/reassembly_streams.h"
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
|
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
|
||||||
#include "rtc_base/logging.h"
|
#include "rtc_base/logging.h"
|
||||||
|
|
||||||
namespace dcsctp {
|
namespace dcsctp {
|
||||||
|
namespace {
|
||||||
|
std::unique_ptr<ReassemblyStreams> CreateStreams(
|
||||||
|
absl::string_view log_prefix,
|
||||||
|
ReassemblyStreams::OnAssembledMessage on_assembled_message,
|
||||||
|
bool use_message_interleaving,
|
||||||
|
const DcSctpSocketHandoverState* handover_state) {
|
||||||
|
if (use_message_interleaving) {
|
||||||
|
return std::make_unique<InterleavedReassemblyStreams>(
|
||||||
|
log_prefix, std::move(on_assembled_message), handover_state);
|
||||||
|
}
|
||||||
|
return std::make_unique<TraditionalReassemblyStreams>(
|
||||||
|
log_prefix, std::move(on_assembled_message), handover_state);
|
||||||
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
ReassemblyQueue::ReassemblyQueue(
|
ReassemblyQueue::ReassemblyQueue(
|
||||||
absl::string_view log_prefix,
|
absl::string_view log_prefix,
|
||||||
TSN peer_initial_tsn,
|
TSN peer_initial_tsn,
|
||||||
size_t max_size_bytes,
|
size_t max_size_bytes,
|
||||||
|
bool use_message_interleaving,
|
||||||
const DcSctpSocketHandoverState* handover_state)
|
const DcSctpSocketHandoverState* handover_state)
|
||||||
: log_prefix_(std::string(log_prefix) + "reasm: "),
|
: log_prefix_(std::string(log_prefix) + "reasm: "),
|
||||||
max_size_bytes_(max_size_bytes),
|
max_size_bytes_(max_size_bytes),
|
||||||
@ -50,12 +67,13 @@ ReassemblyQueue::ReassemblyQueue(
|
|||||||
? ReconfigRequestSN(
|
? ReconfigRequestSN(
|
||||||
handover_state->rx.last_completed_deferred_reset_req_sn)
|
handover_state->rx.last_completed_deferred_reset_req_sn)
|
||||||
: ReconfigRequestSN(0)),
|
: ReconfigRequestSN(0)),
|
||||||
streams_(std::make_unique<TraditionalReassemblyStreams>(
|
streams_(CreateStreams(
|
||||||
log_prefix_,
|
log_prefix_,
|
||||||
[this](rtc::ArrayView<const UnwrappedTSN> tsns,
|
[this](rtc::ArrayView<const UnwrappedTSN> tsns,
|
||||||
DcSctpMessage message) {
|
DcSctpMessage message) {
|
||||||
AddReassembledMessage(tsns, std::move(message));
|
AddReassembledMessage(tsns, std::move(message));
|
||||||
},
|
},
|
||||||
|
use_message_interleaving,
|
||||||
handover_state)) {}
|
handover_state)) {}
|
||||||
|
|
||||||
void ReassemblyQueue::Add(TSN tsn, Data data) {
|
void ReassemblyQueue::Add(TSN tsn, Data data) {
|
||||||
|
@ -72,6 +72,7 @@ class ReassemblyQueue {
|
|||||||
ReassemblyQueue(absl::string_view log_prefix,
|
ReassemblyQueue(absl::string_view log_prefix,
|
||||||
TSN peer_initial_tsn,
|
TSN peer_initial_tsn,
|
||||||
size_t max_size_bytes,
|
size_t max_size_bytes,
|
||||||
|
bool use_message_interleaving = false,
|
||||||
const DcSctpSocketHandoverState* handover_state = nullptr);
|
const DcSctpSocketHandoverState* handover_state = nullptr);
|
||||||
|
|
||||||
// Adds a data chunk to the queue, with a `tsn` and other parameters in
|
// Adds a data chunk to the queue, with a `tsn` and other parameters in
|
||||||
|
@ -33,6 +33,7 @@ namespace dcsctp {
|
|||||||
namespace {
|
namespace {
|
||||||
using ::testing::ElementsAre;
|
using ::testing::ElementsAre;
|
||||||
using ::testing::SizeIs;
|
using ::testing::SizeIs;
|
||||||
|
using ::testing::UnorderedElementsAre;
|
||||||
|
|
||||||
// The default maximum size of the Reassembly Queue.
|
// The default maximum size of the Reassembly Queue.
|
||||||
static constexpr size_t kBufferSize = 10000;
|
static constexpr size_t kBufferSize = 10000;
|
||||||
@ -45,6 +46,11 @@ static constexpr PPID kPPID(53);
|
|||||||
|
|
||||||
static constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4};
|
static constexpr std::array<uint8_t, 4> kShortPayload = {1, 2, 3, 4};
|
||||||
static constexpr std::array<uint8_t, 4> kMessage2Payload = {5, 6, 7, 8};
|
static constexpr std::array<uint8_t, 4> kMessage2Payload = {5, 6, 7, 8};
|
||||||
|
static constexpr std::array<uint8_t, 6> kSixBytePayload = {1, 2, 3, 4, 5, 6};
|
||||||
|
static constexpr std::array<uint8_t, 8> kMediumPayload1 = {1, 2, 3, 4,
|
||||||
|
5, 6, 7, 8};
|
||||||
|
static constexpr std::array<uint8_t, 8> kMediumPayload2 = {9, 10, 11, 12,
|
||||||
|
13, 14, 15, 16};
|
||||||
static constexpr std::array<uint8_t, 16> kLongPayload = {
|
static constexpr std::array<uint8_t, 16> kLongPayload = {
|
||||||
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
|
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
|
||||||
|
|
||||||
@ -369,7 +375,8 @@ TEST_F(ReassemblyQueueTest, HandoverInInitialState) {
|
|||||||
DcSctpSocketHandoverState state;
|
DcSctpSocketHandoverState state;
|
||||||
reasm1.AddHandoverState(state);
|
reasm1.AddHandoverState(state);
|
||||||
g_handover_state_transformer_for_test(&state);
|
g_handover_state_transformer_for_test(&state);
|
||||||
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
|
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/false, &state);
|
||||||
|
|
||||||
reasm2.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
|
reasm2.Add(TSN(10), gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||||
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
|
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
|
||||||
@ -384,7 +391,8 @@ TEST_F(ReassemblyQueueTest, HandoverAfterHavingAssembedOneMessage) {
|
|||||||
DcSctpSocketHandoverState state;
|
DcSctpSocketHandoverState state;
|
||||||
reasm1.AddHandoverState(state);
|
reasm1.AddHandoverState(state);
|
||||||
g_handover_state_transformer_for_test(&state);
|
g_handover_state_transformer_for_test(&state);
|
||||||
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize, &state);
|
ReassemblyQueue reasm2("log: ", TSN(100), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/false, &state);
|
||||||
|
|
||||||
reasm2.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE"));
|
reasm2.Add(TSN(11), gen_.Ordered({1, 2, 3, 4}, "BE"));
|
||||||
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
|
EXPECT_THAT(reasm2.FlushMessages(), SizeIs(1));
|
||||||
@ -405,5 +413,95 @@ TEST_F(ReassemblyQueueTest, HandleInconsistentForwardTSN) {
|
|||||||
// Don't assemble SSN=7, as that TSN is skipped.
|
// Don't assemble SSN=7, as that TSN is skipped.
|
||||||
EXPECT_FALSE(reasm.HasMessages());
|
EXPECT_FALSE(reasm.HasMessages());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, SingleUnorderedChunkMessageInRfc8260) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/true);
|
||||||
|
reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
|
||||||
|
{1, 2, 3, 4}, Data::IsBeginning(true),
|
||||||
|
Data::IsEnd(true), IsUnordered(true)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kShortPayload)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, TwoInterleavedChunks) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/true);
|
||||||
|
reasm.Add(TSN(10), Data(StreamID(1), SSN(0), MID(0), FSN(0), kPPID,
|
||||||
|
{1, 2, 3, 4}, Data::IsBeginning(true),
|
||||||
|
Data::IsEnd(false), IsUnordered(true)));
|
||||||
|
reasm.Add(TSN(11), Data(StreamID(2), SSN(0), MID(0), FSN(0), kPPID,
|
||||||
|
{9, 10, 11, 12}, Data::IsBeginning(true),
|
||||||
|
Data::IsEnd(false), IsUnordered(true)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 8u);
|
||||||
|
reasm.Add(TSN(12), Data(StreamID(1), SSN(0), MID(0), FSN(1), kPPID,
|
||||||
|
{5, 6, 7, 8}, Data::IsBeginning(false),
|
||||||
|
Data::IsEnd(true), IsUnordered(true)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 4u);
|
||||||
|
reasm.Add(TSN(13), Data(StreamID(2), SSN(0), MID(0), FSN(1), kPPID,
|
||||||
|
{13, 14, 15, 16}, Data::IsBeginning(false),
|
||||||
|
Data::IsEnd(true), IsUnordered(true)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(StreamID(1), kPPID, kMediumPayload1),
|
||||||
|
SctpMessageIs(StreamID(2), kPPID, kMediumPayload2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, UnorderedInterleavedMessagesAllPermutations) {
|
||||||
|
std::vector<int> indexes = {0, 1, 2, 3, 4, 5};
|
||||||
|
TSN tsns[] = {TSN(10), TSN(11), TSN(12), TSN(13), TSN(14), TSN(15)};
|
||||||
|
StreamID stream_ids[] = {StreamID(1), StreamID(2), StreamID(1),
|
||||||
|
StreamID(1), StreamID(2), StreamID(2)};
|
||||||
|
FSN fsns[] = {FSN(0), FSN(0), FSN(1), FSN(2), FSN(1), FSN(2)};
|
||||||
|
rtc::ArrayView<const uint8_t> payload(kSixBytePayload);
|
||||||
|
do {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/true);
|
||||||
|
for (int i : indexes) {
|
||||||
|
auto span = payload.subview(*fsns[i] * 2, 2);
|
||||||
|
Data::IsBeginning is_beginning(fsns[i] == FSN(0));
|
||||||
|
Data::IsEnd is_end(fsns[i] == FSN(2));
|
||||||
|
reasm.Add(tsns[i], Data(stream_ids[i], SSN(0), MID(0), fsns[i], kPPID,
|
||||||
|
std::vector<uint8_t>(span.begin(), span.end()),
|
||||||
|
is_beginning, is_end, IsUnordered(true)));
|
||||||
|
}
|
||||||
|
EXPECT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
UnorderedElementsAre(
|
||||||
|
SctpMessageIs(StreamID(1), kPPID, kSixBytePayload),
|
||||||
|
SctpMessageIs(StreamID(2), kPPID, kSixBytePayload)));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
} while (std::next_permutation(std::begin(indexes), std::end(indexes)));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ReassemblyQueueTest, IForwardTSNRemoveALotOrdered) {
|
||||||
|
ReassemblyQueue reasm("log: ", TSN(10), kBufferSize,
|
||||||
|
/*use_message_interleaving=*/true);
|
||||||
|
reasm.Add(TSN(10), gen_.Ordered({1}, "B"));
|
||||||
|
gen_.Ordered({2}, "");
|
||||||
|
reasm.Add(TSN(12), gen_.Ordered({3}, ""));
|
||||||
|
reasm.Add(TSN(13), gen_.Ordered({4}, "E"));
|
||||||
|
reasm.Add(TSN(15), gen_.Ordered({5}, "B"));
|
||||||
|
reasm.Add(TSN(16), gen_.Ordered({6}, ""));
|
||||||
|
reasm.Add(TSN(17), gen_.Ordered({7}, ""));
|
||||||
|
reasm.Add(TSN(18), gen_.Ordered({8}, "E"));
|
||||||
|
|
||||||
|
ASSERT_FALSE(reasm.HasMessages());
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 7u);
|
||||||
|
|
||||||
|
reasm.Handle(
|
||||||
|
IForwardTsnChunk(TSN(13), {IForwardTsnChunk::SkippedStream(
|
||||||
|
IsUnordered(false), kStreamID, MID(0))}));
|
||||||
|
EXPECT_EQ(reasm.queued_bytes(), 0u);
|
||||||
|
|
||||||
|
// The lost chunk comes, but too late.
|
||||||
|
ASSERT_TRUE(reasm.HasMessages());
|
||||||
|
EXPECT_THAT(reasm.FlushMessages(),
|
||||||
|
ElementsAre(SctpMessageIs(kStreamID, kPPID, kMessage2Payload)));
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
} // namespace dcsctp
|
} // namespace dcsctp
|
||||||
|
55
net/dcsctp/rx/reassembly_streams.cc
Normal file
55
net/dcsctp/rx/reassembly_streams.cc
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
/*
|
||||||
|
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
|
||||||
|
*
|
||||||
|
* Use of this source code is governed by a BSD-style license
|
||||||
|
* that can be found in the LICENSE file in the root of the source
|
||||||
|
* tree. An additional intellectual property rights grant can be found
|
||||||
|
* in the file PATENTS. All contributing project authors may
|
||||||
|
* be found in the AUTHORS file in the root of the source tree.
|
||||||
|
*/
|
||||||
|
#include "net/dcsctp/rx/reassembly_streams.h"
|
||||||
|
|
||||||
|
#include <cstddef>
|
||||||
|
#include <map>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
namespace dcsctp {
|
||||||
|
|
||||||
|
ReassembledMessage AssembleMessage(std::map<UnwrappedTSN, Data>::iterator start,
|
||||||
|
std::map<UnwrappedTSN, Data>::iterator end) {
|
||||||
|
size_t count = std::distance(start, end);
|
||||||
|
|
||||||
|
if (count == 1) {
|
||||||
|
// Fast path - zero-copy
|
||||||
|
Data& data = start->second;
|
||||||
|
|
||||||
|
return ReassembledMessage{
|
||||||
|
.tsns = {start->first},
|
||||||
|
.message = DcSctpMessage(data.stream_id, data.ppid,
|
||||||
|
std::move(start->second.payload)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slow path - will need to concatenate the payload.
|
||||||
|
std::vector<UnwrappedTSN> tsns;
|
||||||
|
std::vector<uint8_t> payload;
|
||||||
|
|
||||||
|
size_t payload_size = std::accumulate(
|
||||||
|
start, end, 0,
|
||||||
|
[](size_t v, const auto& p) { return v + p.second.size(); });
|
||||||
|
|
||||||
|
tsns.reserve(count);
|
||||||
|
payload.reserve(payload_size);
|
||||||
|
for (auto it = start; it != end; ++it) {
|
||||||
|
Data& data = it->second;
|
||||||
|
tsns.push_back(it->first);
|
||||||
|
payload.insert(payload.end(), data.payload.begin(), data.payload.end());
|
||||||
|
}
|
||||||
|
|
||||||
|
return ReassembledMessage{
|
||||||
|
.tsns = std::move(tsns),
|
||||||
|
.message = DcSctpMessage(start->second.stream_id, start->second.ppid,
|
||||||
|
std::move(payload)),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} // namespace dcsctp
|
@ -98,6 +98,7 @@ class TransmissionControlBlock : public Context {
|
|||||||
reassembly_queue_(log_prefix,
|
reassembly_queue_(log_prefix,
|
||||||
peer_initial_tsn,
|
peer_initial_tsn,
|
||||||
options.max_receiver_window_buffer_size,
|
options.max_receiver_window_buffer_size,
|
||||||
|
capabilities.message_interleaving,
|
||||||
handover_state),
|
handover_state),
|
||||||
retransmission_queue_(
|
retransmission_queue_(
|
||||||
log_prefix,
|
log_prefix,
|
||||||
|
Reference in New Issue
Block a user