dcsctp: Add Traditional Reassembly Streams

This class handles the assembly of fragmented received messages (as DATA
chunks) and manage per-stream queues. This class only handles
non-interleaved messages as described in RFC4960, and is not used when
message interleaving is enabled on the association, as described in
RFC8260.

This is also only part of the reassembly - a follow-up change will add
the ReassemblyQueue that handle the other part as well. And an even
further follow-up change will add a "interleaved reassembly stream".

Bug: webrtc:12614
Change-Id: Iaf339fa215a2b14926f5cb74f15528392e273f99
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/214042
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Reviewed-by: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33677}
This commit is contained in:
Victor Boivie
2021-04-03 19:39:46 +02:00
committed by Commit Bot
parent b2d539be6b
commit 8a13d2ca9f
5 changed files with 669 additions and 1 deletions

View File

@ -21,18 +21,42 @@ rtc_library("data_tracker") {
]
}
rtc_source_set("reassembly_streams") {
deps = [ "../packet:chunk" ]
sources = [ "reassembly_streams.h" ]
}
rtc_library("traditional_reassembly_streams") {
deps = [
":reassembly_streams",
"../../../api:array_view",
"../../../rtc_base",
"../../../rtc_base:checks",
"../../../rtc_base:rtc_base_approved",
]
sources = [
"traditional_reassembly_streams.cc",
"traditional_reassembly_streams.h",
]
}
if (rtc_include_tests) {
rtc_library("dcsctp_rx_unittests") {
testonly = true
deps = [
":data_tracker",
":traditional_reassembly_streams",
"../../../api:array_view",
"../../../rtc_base:checks",
"../../../rtc_base:gunit_helpers",
"../../../rtc_base:rtc_base_approved",
"../../../test:test_support",
"../testing:data_generator",
]
sources = [
"data_tracker_test.cc",
"traditional_reassembly_streams_test.cc",
]
sources = [ "data_tracker_test.cc" ]
}
}

View File

@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
#define NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <vector>
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
namespace dcsctp {
// Implementations of this interface will be called when data is received, when
// data should be skipped/forgotten or when sequence number should be reset.
//
// As a result of these operations - mainly when data is received - the
// implementations of this interface should notify when a message has been
// assembled, by calling the provided callback of type `OnAssembledMessage`. How
// it assembles messages will depend on e.g. if a message was sent on an ordered
// or unordered stream.
//
// Implementations will - for each operation - indicate how much additional
// memory that has been used as a result of performing the operation. This is
// used to limit the maximum amount of memory used, to prevent out-of-memory
// situations.
class ReassemblyStreams {
public:
// This callback will be provided as an argument to the constructor of the
// concrete class implementing this interface and should be called when a
// message has been assembled as well as indicating from which TSNs this
// message was assembled from.
using OnAssembledMessage =
std::function<void(rtc::ArrayView<const UnwrappedTSN> tsns,
DcSctpMessage message)>;
virtual ~ReassemblyStreams() = default;
// Adds a data chunk to a stream as identified in `data`.
// If it was the last remaining chunk in a message, reassemble one (or
// several, in case of ordered chunks) messages.
//
// Returns the additional number of bytes added to the queue as a result of
// performing this operation. If this addition resulted in messages being
// assembled and delivered, this may be negative.
virtual int Add(UnwrappedTSN tsn, Data data) = 0;
// Called for incoming FORWARD-TSN/I-FORWARD-TSN chunks - when the sender
// wishes the received to skip/forget about data up until the provided TSN.
// This is used to implement partial reliability, such as limiting the number
// of retransmissions or the an expiration duration. As a result of skipping
// data, this may result in the implementation being able to assemble messages
// in ordered streams.
//
// Returns the number of bytes removed from the queue as a result of
// this operation.
virtual size_t HandleForwardTsn(
UnwrappedTSN new_cumulative_ack_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream>
skipped_streams) = 0;
// Called for incoming (possibly deferred) RE_CONFIG chunks asking for
// either a few streams, or all streams (when the list is empty) to be
// reset - to have their next SSN or Message ID to be zero.
virtual void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) = 0;
};
} // namespace dcsctp
#endif // NET_DCSCTP_RX_REASSEMBLY_STREAMS_H_

View File

@ -0,0 +1,289 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include <stddef.h>
#include <cstdint>
#include <functional>
#include <iterator>
#include <map>
#include <numeric>
#include <unordered_map>
#include <utility>
#include <vector>
#include "absl/algorithm/container.h"
#include "absl/types/optional.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/public/dcsctp_message.h"
#include "rtc_base/logging.h"
namespace dcsctp {
namespace {
// Given a map (`chunks`) and an iterator to within that map (`iter`), this
// function will return an iterator to the first chunk in that message, which
// has the `is_beginning` flag set. If there are any gaps, or if the beginning
// can't be found, `absl::nullopt` is returned.
absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindBeginning(
const std::map<UnwrappedTSN, Data>& chunks,
std::map<UnwrappedTSN, Data>::iterator iter) {
UnwrappedTSN prev_tsn = iter->first;
for (;;) {
if (iter->second.is_beginning) {
return iter;
}
if (iter == chunks.begin()) {
return absl::nullopt;
}
--iter;
if (iter->first.next_value() != prev_tsn) {
return absl::nullopt;
}
prev_tsn = iter->first;
}
}
// Given a map (`chunks`) and an iterator to within that map (`iter`), this
// function will return an iterator to the chunk after the last chunk in that
// message, which has the `is_end` flag set. If there are any gaps, or if the
// end can't be found, `absl::nullopt` is returned.
absl::optional<std::map<UnwrappedTSN, Data>::iterator> FindEnd(
std::map<UnwrappedTSN, Data>& chunks,
std::map<UnwrappedTSN, Data>::iterator iter) {
UnwrappedTSN prev_tsn = iter->first;
for (;;) {
if (iter->second.is_end) {
return ++iter;
}
++iter;
if (iter == chunks.end()) {
return absl::nullopt;
}
if (iter->first != prev_tsn.next_value()) {
return absl::nullopt;
}
prev_tsn = iter->first;
}
}
} // namespace
int TraditionalReassemblyStreams::UnorderedStream::Add(UnwrappedTSN tsn,
Data data) {
int queued_bytes = data.size();
auto p = chunks_.emplace(tsn, std::move(data));
if (!p.second /* !inserted */) {
return 0;
}
queued_bytes -= TryToAssembleMessage(p.first);
return queued_bytes;
}
size_t TraditionalReassemblyStreams::UnorderedStream::TryToAssembleMessage(
ChunkMap::iterator iter) {
// TODO(boivie): This method is O(N) with the number of fragments in a
// message, which can be inefficient for very large values of N. This could be
// optimized by e.g. only trying to assemble a message once _any_ beginning
// and _any_ end has been found.
absl::optional<ChunkMap::iterator> start = FindBeginning(chunks_, iter);
if (!start.has_value()) {
return 0;
}
absl::optional<ChunkMap::iterator> end = FindEnd(chunks_, iter);
if (!end.has_value()) {
return 0;
}
size_t bytes_assembled = AssembleMessage(*start, *end);
chunks_.erase(*start, *end);
return bytes_assembled;
}
size_t TraditionalReassemblyStreams::StreamBase::AssembleMessage(
const ChunkMap::iterator start,
const ChunkMap::iterator end) {
size_t count = std::distance(start, end);
if (count == 1) {
// Fast path - zero-copy
const Data& data = start->second;
size_t payload_size = start->second.size();
UnwrappedTSN tsns[1] = {start->first};
DcSctpMessage message(data.stream_id, data.ppid, std::move(data.payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
}
// Slow path - will need to concatenate the payload.
std::vector<UnwrappedTSN> tsns;
std::vector<uint8_t> payload;
size_t payload_size = std::accumulate(
start, end, 0,
[](size_t v, const auto& p) { return v + p.second.size(); });
tsns.reserve(count);
payload.reserve(payload_size);
for (auto it = start; it != end; ++it) {
const Data& data = it->second;
tsns.push_back(it->first);
payload.insert(payload.end(), data.payload.begin(), data.payload.end());
}
DcSctpMessage message(start->second.stream_id, start->second.ppid,
std::move(payload));
parent_.on_assembled_message_(tsns, std::move(message));
return payload_size;
}
size_t TraditionalReassemblyStreams::UnorderedStream::EraseTo(
UnwrappedTSN tsn) {
auto end_iter = chunks_.upper_bound(tsn);
size_t removed_bytes = std::accumulate(
chunks_.begin(), end_iter, 0,
[](size_t r, const auto& p) { return r + p.second.size(); });
chunks_.erase(chunks_.begin(), end_iter);
return removed_bytes;
}
size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessage() {
if (chunks_by_ssn_.empty() || chunks_by_ssn_.begin()->first != next_ssn_) {
return 0;
}
ChunkMap& chunks = chunks_by_ssn_.begin()->second;
if (!chunks.begin()->second.is_beginning || !chunks.rbegin()->second.is_end) {
return 0;
}
uint32_t tsn_diff = chunks.rbegin()->first.Difference(chunks.begin()->first);
if (tsn_diff != chunks.size() - 1) {
return 0;
}
size_t assembled_bytes = AssembleMessage(chunks.begin(), chunks.end());
chunks_by_ssn_.erase(chunks_by_ssn_.begin());
next_ssn_.Increment();
return assembled_bytes;
}
size_t TraditionalReassemblyStreams::OrderedStream::TryToAssembleMessages() {
size_t assembled_bytes = 0;
for (;;) {
size_t assembled_bytes_this_iter = TryToAssembleMessage();
if (assembled_bytes_this_iter == 0) {
break;
}
assembled_bytes += assembled_bytes_this_iter;
}
return assembled_bytes;
}
int TraditionalReassemblyStreams::OrderedStream::Add(UnwrappedTSN tsn,
Data data) {
int queued_bytes = data.size();
UnwrappedSSN ssn = ssn_unwrapper_.Unwrap(data.ssn);
auto p = chunks_by_ssn_[ssn].emplace(tsn, std::move(data));
if (!p.second /* !inserted */) {
return 0;
}
if (ssn == next_ssn_) {
queued_bytes -= TryToAssembleMessages();
}
return queued_bytes;
}
size_t TraditionalReassemblyStreams::OrderedStream::EraseTo(SSN ssn) {
UnwrappedSSN unwrapped_ssn = ssn_unwrapper_.Unwrap(ssn);
auto end_iter = chunks_by_ssn_.upper_bound(unwrapped_ssn);
size_t removed_bytes = std::accumulate(
chunks_by_ssn_.begin(), end_iter, 0, [](size_t r1, const auto& p) {
return r1 +
absl::c_accumulate(p.second, 0, [](size_t r2, const auto& q) {
return r2 + q.second.size();
});
});
chunks_by_ssn_.erase(chunks_by_ssn_.begin(), end_iter);
if (unwrapped_ssn >= next_ssn_) {
unwrapped_ssn.Increment();
next_ssn_ = unwrapped_ssn;
}
removed_bytes += TryToAssembleMessages();
return removed_bytes;
}
int TraditionalReassemblyStreams::Add(UnwrappedTSN tsn, Data data) {
if (data.is_unordered) {
auto it = unordered_streams_.emplace(data.stream_id, this).first;
return it->second.Add(tsn, std::move(data));
}
auto it = ordered_streams_.emplace(data.stream_id, this).first;
return it->second.Add(tsn, std::move(data));
}
size_t TraditionalReassemblyStreams::HandleForwardTsn(
UnwrappedTSN new_cumulative_ack_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams) {
size_t bytes_removed = 0;
// The `skipped_streams` only over ordered messages - need to
// iterate all unordered streams manually to remove those chunks.
for (auto& entry : unordered_streams_) {
bytes_removed += entry.second.EraseTo(new_cumulative_ack_tsn);
}
for (const auto& skipped_stream : skipped_streams) {
auto it = ordered_streams_.find(skipped_stream.stream_id);
if (it != ordered_streams_.end()) {
bytes_removed += it->second.EraseTo(skipped_stream.ssn);
}
}
return bytes_removed;
}
void TraditionalReassemblyStreams::ResetStreams(
rtc::ArrayView<const StreamID> stream_ids) {
if (stream_ids.empty()) {
for (auto& entry : ordered_streams_) {
const StreamID& stream_id = entry.first;
OrderedStream& stream = entry.second;
RTC_DLOG(LS_VERBOSE) << log_prefix_
<< "Resetting implicit stream_id=" << *stream_id;
stream.Reset();
}
} else {
for (StreamID stream_id : stream_ids) {
auto it = ordered_streams_.find(stream_id);
if (it != ordered_streams_.end()) {
RTC_DLOG(LS_VERBOSE)
<< log_prefix_ << "Resetting explicit stream_id=" << *stream_id;
it->second.Reset();
}
}
}
}
} // namespace dcsctp

View File

@ -0,0 +1,119 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
#define NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_
#include <stddef.h>
#include <stdint.h>
#include <map>
#include <string>
#include <unordered_map>
#include "absl/strings/string_view.h"
#include "api/array_view.h"
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/rx/reassembly_streams.h"
namespace dcsctp {
// Handles reassembly of incoming data when interleaved message sending
// is not enabled on the association, i.e. when RFC8260 is not in use and
// RFC4960 is to be followed.
class TraditionalReassemblyStreams : public ReassemblyStreams {
public:
TraditionalReassemblyStreams(absl::string_view log_prefix,
OnAssembledMessage on_assembled_message)
: log_prefix_(log_prefix), on_assembled_message_(on_assembled_message) {}
int Add(UnwrappedTSN tsn, Data data) override;
size_t HandleForwardTsn(
UnwrappedTSN new_cumulative_ack_tsn,
rtc::ArrayView<const AnyForwardTsnChunk::SkippedStream> skipped_streams)
override;
void ResetStreams(rtc::ArrayView<const StreamID> stream_ids) override;
private:
using ChunkMap = std::map<UnwrappedTSN, Data>;
// Base class for `UnorderedStream` and `OrderedStream`.
class StreamBase {
protected:
explicit StreamBase(TraditionalReassemblyStreams* parent)
: parent_(*parent) {}
size_t AssembleMessage(const ChunkMap::iterator start,
const ChunkMap::iterator end);
TraditionalReassemblyStreams& parent_;
};
// Manages all received data for a specific unordered stream, and assembles
// messages when possible.
class UnorderedStream : StreamBase {
public:
explicit UnorderedStream(TraditionalReassemblyStreams* parent)
: StreamBase(parent) {}
int Add(UnwrappedTSN tsn, Data data);
// Returns the number of bytes removed from the queue.
size_t EraseTo(UnwrappedTSN tsn);
private:
// Given an iterator to any chunk within the map, try to assemble a message
// into `reassembled_messages` containing it and - if successful - erase
// those chunks from the stream chunks map.
//
// Returns the number of bytes that were assembled.
size_t TryToAssembleMessage(ChunkMap::iterator iter);
ChunkMap chunks_;
};
// Manages all received data for a specific ordered stream, and assembles
// messages when possible.
class OrderedStream : StreamBase {
public:
explicit OrderedStream(TraditionalReassemblyStreams* parent)
: StreamBase(parent), next_ssn_(ssn_unwrapper_.Unwrap(SSN(0))) {}
int Add(UnwrappedTSN tsn, Data data);
size_t EraseTo(SSN ssn);
void Reset() {
ssn_unwrapper_.Reset();
next_ssn_ = ssn_unwrapper_.Unwrap(SSN(0));
}
private:
// Try to assemble one or several messages in order from the stream.
// Returns the number of bytes assembled if a message was assembled.
size_t TryToAssembleMessage();
size_t TryToAssembleMessages();
// This must be an ordered container to be able to iterate in SSN order.
std::map<UnwrappedSSN, ChunkMap> chunks_by_ssn_;
UnwrappedSSN::Unwrapper ssn_unwrapper_;
UnwrappedSSN next_ssn_;
};
const std::string log_prefix_;
// Callback for when a message has been assembled.
const OnAssembledMessage on_assembled_message_;
// All unordered and ordered streams, managing not-yet-assembled data.
std::unordered_map<StreamID, UnorderedStream, StreamID::Hasher>
unordered_streams_;
std::unordered_map<StreamID, OrderedStream, StreamID::Hasher>
ordered_streams_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_RX_TRADITIONAL_REASSEMBLY_STREAMS_H_

View File

@ -0,0 +1,152 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/rx/traditional_reassembly_streams.h"
#include <cstdint>
#include <memory>
#include <utility>
#include "net/dcsctp/common/sequence_numbers.h"
#include "net/dcsctp/packet/chunk/forward_tsn_chunk.h"
#include "net/dcsctp/packet/chunk/forward_tsn_common.h"
#include "net/dcsctp/packet/data.h"
#include "net/dcsctp/rx/reassembly_streams.h"
#include "net/dcsctp/testing/data_generator.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::MockFunction;
using ::testing::NiceMock;
class TraditionalReassemblyStreamsTest : public testing::Test {
protected:
UnwrappedTSN tsn(uint32_t value) { return tsn_.Unwrap(TSN(value)); }
TraditionalReassemblyStreamsTest() {}
DataGenerator gen_;
UnwrappedTSN::Unwrapper tsn_;
};
TEST_F(TraditionalReassemblyStreamsTest,
AddUnorderedMessageReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
// Adding the end fragment should make it empty again.
EXPECT_EQ(streams.Add(tsn(4), gen_.Unordered({7}, "E")), -6);
}
TEST_F(TraditionalReassemblyStreamsTest,
AddSimpleOrderedMessageReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), -6);
}
TEST_F(TraditionalReassemblyStreamsTest,
AddMoreComplexOrderedMessageReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
Data late = gen_.Ordered({2, 3, 4});
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
EXPECT_EQ(streams.Add(tsn(2), std::move(late)), -8);
}
TEST_F(TraditionalReassemblyStreamsTest,
DeleteUnorderedMessageReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Unordered({1}, "B")), 1);
EXPECT_EQ(streams.Add(tsn(2), gen_.Unordered({2, 3, 4})), 3);
EXPECT_EQ(streams.Add(tsn(3), gen_.Unordered({5, 6})), 2);
EXPECT_EQ(streams.HandleForwardTsn(tsn(3), {}), 6u);
}
TEST_F(TraditionalReassemblyStreamsTest,
DeleteSimpleOrderedMessageReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
EXPECT_EQ(streams.Add(tsn(2), gen_.Ordered({2, 3, 4})), 3);
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
ForwardTsnChunk::SkippedStream skipped[] = {
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
EXPECT_EQ(streams.HandleForwardTsn(tsn(3), skipped), 6u);
}
TEST_F(TraditionalReassemblyStreamsTest,
DeleteManyOrderedMessagesReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
gen_.Ordered({2, 3, 4});
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
// Expire all three messages
ForwardTsnChunk::SkippedStream skipped[] = {
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(2))};
EXPECT_EQ(streams.HandleForwardTsn(tsn(8), skipped), 8u);
}
TEST_F(TraditionalReassemblyStreamsTest,
DeleteOrderedMessageDelivesTwoReturnsCorrectSize) {
NiceMock<MockFunction<ReassemblyStreams::OnAssembledMessage>> on_assembled;
TraditionalReassemblyStreams streams("", on_assembled.AsStdFunction());
EXPECT_EQ(streams.Add(tsn(1), gen_.Ordered({1}, "B")), 1);
gen_.Ordered({2, 3, 4});
EXPECT_EQ(streams.Add(tsn(3), gen_.Ordered({5, 6})), 2);
EXPECT_EQ(streams.Add(tsn(4), gen_.Ordered({7}, "E")), 1);
EXPECT_EQ(streams.Add(tsn(5), gen_.Ordered({1}, "BE")), 1);
EXPECT_EQ(streams.Add(tsn(6), gen_.Ordered({5, 6}, "B")), 2);
EXPECT_EQ(streams.Add(tsn(7), gen_.Ordered({7}, "E")), 1);
// The first ordered message expire, and the following two are delivered.
ForwardTsnChunk::SkippedStream skipped[] = {
ForwardTsnChunk::SkippedStream(StreamID(1), SSN(0))};
EXPECT_EQ(streams.HandleForwardTsn(tsn(4), skipped), 8u);
}
} // namespace
} // namespace dcsctp