dcsctp: Add PacketSender

This is mainly a refactoring commit, to break out packet sending to a
dedicated component.

Bug: webrtc:12943
Change-Id: I78f18933776518caf49737d3952bda97f19ef335
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/228565
Reviewed-by: Florent Castelli <orphis@webrtc.org>
Commit-Queue: Victor Boivie <boivie@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#34772}
This commit is contained in:
Victor Boivie
2021-08-12 15:57:49 +02:00
committed by WebRTC LUCI CQ
parent 6b89130d45
commit abf6188cba
8 changed files with 225 additions and 61 deletions

View File

@ -76,10 +76,25 @@ rtc_library("stream_reset_handler") {
]
}
rtc_library("packet_sender") {
deps = [
"../packet:sctp_packet",
"../public:socket",
"../public:types",
"../timer",
]
sources = [
"packet_sender.cc",
"packet_sender.h",
]
absl_deps = []
}
rtc_library("transmission_control_block") {
deps = [
":context",
":heartbeat_handler",
":packet_sender",
":stream_reset_handler",
"../../../api:array_view",
"../../../rtc_base",
@ -114,6 +129,7 @@ rtc_library("dcsctp_socket") {
deps = [
":context",
":heartbeat_handler",
":packet_sender",
":stream_reset_handler",
":transmission_control_block",
"../../../api:array_view",
@ -201,6 +217,7 @@ if (rtc_include_tests) {
":heartbeat_handler",
":mock_callbacks",
":mock_context",
":packet_sender",
":stream_reset_handler",
"../../../api:array_view",
"../../../rtc_base:checks",
@ -233,6 +250,7 @@ if (rtc_include_tests) {
sources = [
"dcsctp_socket_test.cc",
"heartbeat_handler_test.cc",
"packet_sender_test.cc",
"state_cookie_test.cc",
"stream_reset_handler_test.cc",
]

View File

@ -168,6 +168,8 @@ DcSctpSocket::DcSctpSocket(absl::string_view log_prefix,
TimerOptions(options.t2_shutdown_timeout,
TimerBackoffAlgorithm::kExponential,
options.max_retransmissions))),
packet_sender_(callbacks_,
absl::bind_front(&DcSctpSocket::OnSentPacket, this)),
send_queue_(
log_prefix_,
options_.max_send_buffer_size,
@ -251,7 +253,7 @@ void DcSctpSocket::SendInit() {
connect_params_.initial_tsn, params_builder.Build());
SctpPacket::Builder b(VerificationTag(0), options_);
b.Add(init);
SendPacket(b);
packet_sender_.Send(b);
}
void DcSctpSocket::MakeConnectionParameters() {
@ -316,7 +318,7 @@ void DcSctpSocket::Close() {
Parameters::Builder()
.Add(UserInitiatedAbortCause("Close called"))
.Build()));
SendPacket(b);
packet_sender_.Send(b);
}
InternalClose(ErrorKind::kNoError, "");
} else {
@ -327,7 +329,7 @@ void DcSctpSocket::Close() {
}
void DcSctpSocket::CloseConnectionBecauseOfTooManyTransmissionErrors() {
SendPacket(tcb_->PacketBuilder().Add(AbortChunk(
packet_sender_.Send(tcb_->PacketBuilder().Add(AbortChunk(
true, Parameters::Builder()
.Add(UserInitiatedAbortCause("Too many retransmissions"))
.Build())));
@ -412,7 +414,7 @@ ResetStreamsStatus DcSctpSocket::ResetStreams(
if (reconfig.has_value()) {
SctpPacket::Builder builder = tcb_->PacketBuilder();
builder.Add(*reconfig);
SendPacket(builder);
packet_sender_.Send(builder);
}
RTC_DCHECK(IsConsistent());
@ -751,7 +753,7 @@ bool DcSctpSocket::HandleUnrecognizedChunk(
// cause."
if (tcb_ != nullptr) {
// Need TCB - this chunk must be sent with a correct verification tag.
SendPacket(tcb_->PacketBuilder().Add(
packet_sender_.Send(tcb_->PacketBuilder().Add(
ErrorChunk(Parameters::Builder()
.Add(UnrecognizedChunkTypeCause(std::vector<uint8_t>(
descriptor.data.begin(), descriptor.data.end())))
@ -819,7 +821,7 @@ absl::optional<DurationMs> DcSctpSocket::OnShutdownTimerExpiry() {
// chunk to the protocol parameter 'Association.Max.Retrans'. If this
// threshold is exceeded, the endpoint should destroy the TCB..."
SendPacket(tcb_->PacketBuilder().Add(
packet_sender_.Send(tcb_->PacketBuilder().Add(
AbortChunk(true, Parameters::Builder()
.Add(UserInitiatedAbortCause(
"Too many retransmissions of SHUTDOWN"))
@ -838,28 +840,27 @@ absl::optional<DurationMs> DcSctpSocket::OnShutdownTimerExpiry() {
return tcb_->current_rto();
}
void DcSctpSocket::SendPacket(SctpPacket::Builder& builder) {
if (builder.empty()) {
return;
}
std::vector<uint8_t> payload = builder.Build();
if (RTC_DLOG_IS_ON) {
DebugPrintOutgoing(payload);
}
// The heartbeat interval timer is restarted for every sent packet, to
// fire when the outgoing channel is inactive.
if (tcb_ != nullptr) {
tcb_->heartbeat_handler().RestartTimer();
}
void DcSctpSocket::OnSentPacket(rtc::ArrayView<const uint8_t> packet,
SendPacketStatus status) {
// The packet observer is invoked even if the packet was failed to be sent, to
// indicate an attempt was made.
if (packet_observer_ != nullptr) {
packet_observer_->OnSentPacket(callbacks_.TimeMillis(), payload);
packet_observer_->OnSentPacket(callbacks_.TimeMillis(), packet);
}
if (status == SendPacketStatus::kSuccess) {
if (RTC_DLOG_IS_ON) {
DebugPrintOutgoing(packet);
}
// The heartbeat interval timer is restarted for every sent packet, to
// fire when the outgoing channel is inactive.
if (tcb_ != nullptr) {
tcb_->heartbeat_handler().RestartTimer();
}
++metrics_.tx_packets_count;
}
++metrics_.tx_packets_count;
callbacks_.SendPacketWithStatus(payload);
}
bool DcSctpSocket::ValidateHasTCB() {
@ -902,7 +903,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) {
if (data.payload.empty()) {
// Empty DATA chunks are illegal.
SendPacket(tcb_->PacketBuilder().Add(
packet_sender_.Send(tcb_->PacketBuilder().Add(
ErrorChunk(Parameters::Builder().Add(NoUserDataCause(tsn)).Build())));
callbacks_.OnError(ErrorKind::kProtocolViolation,
"Received DATA chunk with no user data");
@ -922,7 +923,7 @@ void DcSctpSocket::HandleDataCommon(AnyDataChunk& chunk) {
// specification only allows dropping gap-ack-blocks, and that's not
// likely to help as the socket has been trying to fill gaps since the
// watermark was reached.
SendPacket(tcb_->PacketBuilder().Add(AbortChunk(
packet_sender_.Send(tcb_->PacketBuilder().Add(AbortChunk(
true, Parameters::Builder().Add(OutOfResourceErrorCause()).Build())));
InternalClose(ErrorKind::kResourceExhaustion,
"Reassembly Queue is exhausted");
@ -975,12 +976,13 @@ void DcSctpSocket::HandleInit(const CommonHeader& header,
// "A receiver of an INIT with the MIS value of 0 SHOULD abort the
// association."
SendPacket(SctpPacket::Builder(VerificationTag(0), options_)
.Add(AbortChunk(
/*filled_in_verification_tag=*/false,
Parameters::Builder()
.Add(ProtocolViolationCause("INIT malformed"))
.Build())));
packet_sender_.Send(
SctpPacket::Builder(VerificationTag(0), options_)
.Add(AbortChunk(
/*filled_in_verification_tag=*/false,
Parameters::Builder()
.Add(ProtocolViolationCause("INIT malformed"))
.Build())));
InternalClose(ErrorKind::kProtocolViolation, "Received invalid INIT");
return;
}
@ -1069,7 +1071,7 @@ void DcSctpSocket::HandleInit(const CommonHeader& header,
options_.announced_maximum_incoming_streams,
connect_params_.initial_tsn, params_builder.Build());
b.Add(init_ack);
SendPacket(b);
packet_sender_.Send(b);
}
void DcSctpSocket::HandleInitAck(
@ -1091,12 +1093,13 @@ void DcSctpSocket::HandleInitAck(
auto cookie = chunk->parameters().get<StateCookieParameter>();
if (!cookie.has_value()) {
SendPacket(SctpPacket::Builder(connect_params_.verification_tag, options_)
.Add(AbortChunk(
/*filled_in_verification_tag=*/false,
Parameters::Builder()
.Add(ProtocolViolationCause("INIT-ACK malformed"))
.Build())));
packet_sender_.Send(
SctpPacket::Builder(connect_params_.verification_tag, options_)
.Add(AbortChunk(
/*filled_in_verification_tag=*/false,
Parameters::Builder()
.Add(ProtocolViolationCause("INIT-ACK malformed"))
.Build())));
InternalClose(ErrorKind::kProtocolViolation,
"InitAck chunk doesn't contain a cookie");
return;
@ -1108,9 +1111,8 @@ void DcSctpSocket::HandleInitAck(
timer_manager_, log_prefix_, options_, capabilities, callbacks_,
send_queue_, connect_params_.verification_tag,
connect_params_.initial_tsn, chunk->initiate_tag(), chunk->initial_tsn(),
chunk->a_rwnd(), MakeTieTag(callbacks_),
[this]() { return state_ == State::kEstablished; },
absl::bind_front(&DcSctpSocket::SendPacket, this));
chunk->a_rwnd(), MakeTieTag(callbacks_), packet_sender_,
[this]() { return state_ == State::kEstablished; });
RTC_DLOG(LS_VERBOSE) << log_prefix()
<< "Created peer TCB: " << tcb_->ToString();
@ -1171,8 +1173,7 @@ void DcSctpSocket::HandleCookieEcho(
callbacks_, send_queue_, connect_params_.verification_tag,
connect_params_.initial_tsn, cookie->initiate_tag(),
cookie->initial_tsn(), cookie->a_rwnd(), MakeTieTag(callbacks_),
[this]() { return state_ == State::kEstablished; },
absl::bind_front(&DcSctpSocket::SendPacket, this));
packet_sender_, [this]() { return state_ == State::kEstablished; });
RTC_DLOG(LS_VERBOSE) << log_prefix()
<< "Created peer TCB: " << tcb_->ToString();
}
@ -1213,7 +1214,7 @@ bool DcSctpSocket::HandleCookieEchoWithTCB(const CommonHeader& header,
b.Add(ErrorChunk(Parameters::Builder()
.Add(CookieReceivedWhileShuttingDownCause())
.Build()));
SendPacket(b);
packet_sender_.Send(b);
callbacks_.OnError(ErrorKind::kWrongSequence,
"Received COOKIE-ECHO while shutting down");
return false;
@ -1445,7 +1446,7 @@ void DcSctpSocket::HandleShutdownAck(
SctpPacket::Builder b = tcb_->PacketBuilder();
b.Add(ShutdownCompleteChunk(/*tag_reflected=*/false));
SendPacket(b);
packet_sender_.Send(b);
InternalClose(ErrorKind::kNoError, "");
} else {
// https://tools.ietf.org/html/rfc4960#section-8.5.1
@ -1464,7 +1465,7 @@ void DcSctpSocket::HandleShutdownAck(
SctpPacket::Builder b(header.verification_tag, options_);
b.Add(ShutdownCompleteChunk(/*tag_reflected=*/true));
SendPacket(b);
packet_sender_.Send(b);
}
}
@ -1516,7 +1517,7 @@ void DcSctpSocket::HandleForwardTsnCommon(const AnyForwardTsnChunk& chunk) {
"I-FORWARD-TSN received, but not indicated "
"during connection establishment"))
.Build()));
SendPacket(b);
packet_sender_.Send(b);
callbacks_.OnError(ErrorKind::kProtocolViolation,
"Received a FORWARD_TSN without announced peer support");
@ -1564,11 +1565,11 @@ void DcSctpSocket::MaybeSendShutdownOrAck() {
void DcSctpSocket::SendShutdown() {
SctpPacket::Builder b = tcb_->PacketBuilder();
b.Add(ShutdownChunk(tcb_->data_tracker().last_cumulative_acked_tsn()));
SendPacket(b);
packet_sender_.Send(b);
}
void DcSctpSocket::SendShutdownAck() {
SendPacket(tcb_->PacketBuilder().Add(ShutdownAckChunk()));
packet_sender_.Send(tcb_->PacketBuilder().Add(ShutdownAckChunk()));
t2_shutdown_->set_duration(tcb_->current_rto());
t2_shutdown_->Start();
}

View File

@ -46,6 +46,7 @@
#include "net/dcsctp/rx/data_tracker.h"
#include "net/dcsctp/rx/reassembly_queue.h"
#include "net/dcsctp/socket/callback_deferrer.h"
#include "net/dcsctp/socket/packet_sender.h"
#include "net/dcsctp/socket/state_cookie.h"
#include "net/dcsctp/socket/transmission_control_block.h"
#include "net/dcsctp/timer/timer.h"
@ -141,8 +142,8 @@ class DcSctpSocket : public DcSctpSocketInterface {
absl::optional<DurationMs> OnInitTimerExpiry();
absl::optional<DurationMs> OnCookieTimerExpiry();
absl::optional<DurationMs> OnShutdownTimerExpiry();
// Builds the packet from `builder` and sends it (through callbacks).
void SendPacket(SctpPacket::Builder& builder);
void OnSentPacket(rtc::ArrayView<const uint8_t> packet,
SendPacketStatus status);
// Sends SHUTDOWN or SHUTDOWN-ACK if the socket is shutting down and if all
// outstanding data has been acknowledged.
void MaybeSendShutdownOrAck();
@ -258,6 +259,9 @@ class DcSctpSocket : public DcSctpSocketInterface {
const std::unique_ptr<Timer> t1_cookie_;
const std::unique_ptr<Timer> t2_shutdown_;
// Packets that failed to be sent, but should be retried.
PacketSender packet_sender_;
// The actual SendQueue implementation. As data can be sent on a socket before
// the connection is established, this component is not in the TCB.
RRSendQueue send_queue_;

View File

@ -0,0 +1,48 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/socket/packet_sender.h"
#include <utility>
#include <vector>
#include "net/dcsctp/public/types.h"
namespace dcsctp {
PacketSender::PacketSender(DcSctpSocketCallbacks& callbacks,
std::function<void(rtc::ArrayView<const uint8_t>,
SendPacketStatus)> on_sent_packet)
: callbacks_(callbacks), on_sent_packet_(std::move(on_sent_packet)) {}
bool PacketSender::Send(SctpPacket::Builder& builder) {
if (builder.empty()) {
return false;
}
std::vector<uint8_t> payload = builder.Build();
SendPacketStatus status = callbacks_.SendPacketWithStatus(payload);
on_sent_packet_(payload, status);
switch (status) {
case SendPacketStatus::kSuccess: {
return true;
}
case SendPacketStatus::kTemporaryFailure: {
// TODO(boivie): Queue this packet to be retried to be sent later.
return false;
}
case SendPacketStatus::kError: {
// Nothing that can be done.
return false;
}
}
}
} // namespace dcsctp

View File

@ -0,0 +1,40 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef NET_DCSCTP_SOCKET_PACKET_SENDER_H_
#define NET_DCSCTP_SOCKET_PACKET_SENDER_H_
#include "net/dcsctp/packet/sctp_packet.h"
#include "net/dcsctp/public/dcsctp_socket.h"
namespace dcsctp {
// The PacketSender sends packets to the network using the provided callback
// interface. When an attempt to send a packet is made, the `on_sent_packet`
// callback will be triggered.
class PacketSender {
public:
PacketSender(DcSctpSocketCallbacks& callbacks,
std::function<void(rtc::ArrayView<const uint8_t>,
SendPacketStatus)> on_sent_packet);
// Sends the packet, and returns true if it was sent successfully.
bool Send(SctpPacket::Builder& builder);
private:
DcSctpSocketCallbacks& callbacks_;
// Callback that will be triggered for every send attempt, indicating the
// status of the operation.
std::function<void(rtc::ArrayView<const uint8_t>, SendPacketStatus)>
on_sent_packet_;
};
} // namespace dcsctp
#endif // NET_DCSCTP_SOCKET_PACKET_SENDER_H_

View File

@ -0,0 +1,50 @@
/*
* Copyright (c) 2021 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "net/dcsctp/socket/packet_sender.h"
#include "net/dcsctp/common/internal_types.h"
#include "net/dcsctp/packet/chunk/cookie_ack_chunk.h"
#include "net/dcsctp/socket/mock_dcsctp_socket_callbacks.h"
#include "rtc_base/gunit.h"
#include "test/gmock.h"
namespace dcsctp {
namespace {
using ::testing::_;
constexpr VerificationTag kVerificationTag(123);
class PacketSenderTest : public testing::Test {
protected:
PacketSenderTest() : sender_(callbacks_, on_send_fn_.AsStdFunction()) {}
SctpPacket::Builder PacketBuilder() const {
return SctpPacket::Builder(kVerificationTag, options_);
}
DcSctpOptions options_;
testing::NiceMock<MockDcSctpSocketCallbacks> callbacks_;
testing::MockFunction<void(rtc::ArrayView<const uint8_t>, SendPacketStatus)>
on_send_fn_;
PacketSender sender_;
};
TEST_F(PacketSenderTest, SendPacketCallsCallback) {
EXPECT_CALL(on_send_fn_, Call(_, SendPacketStatus::kSuccess));
EXPECT_TRUE(sender_.Send(PacketBuilder().Add(CookieAckChunk())));
EXPECT_CALL(callbacks_, SendPacketWithStatus)
.WillOnce(testing::Return(SendPacketStatus::kError));
EXPECT_CALL(on_send_fn_, Call(_, SendPacketStatus::kError));
EXPECT_FALSE(sender_.Send(PacketBuilder().Add(CookieAckChunk())));
}
} // namespace
} // namespace dcsctp

View File

@ -131,10 +131,10 @@ void TransmissionControlBlock::SendBufferedPackets(SctpPacket::Builder& builder,
builder.Add(DataChunk(tsn, std::move(data), false));
}
}
if (builder.empty()) {
if (!packet_sender_.Send(builder)) {
break;
}
Send(builder);
if (cookie_echo_chunk_.has_value()) {
// https://tools.ietf.org/html/rfc4960#section-5.1

View File

@ -29,6 +29,7 @@
#include "net/dcsctp/socket/capabilities.h"
#include "net/dcsctp/socket/context.h"
#include "net/dcsctp/socket/heartbeat_handler.h"
#include "net/dcsctp/socket/packet_sender.h"
#include "net/dcsctp/socket/stream_reset_handler.h"
#include "net/dcsctp/timer/timer.h"
#include "net/dcsctp/tx/retransmission_error_counter.h"
@ -55,8 +56,8 @@ class TransmissionControlBlock : public Context {
TSN peer_initial_tsn,
size_t a_rwnd,
TieTag tie_tag,
std::function<bool()> is_connection_established,
std::function<void(SctpPacket::Builder&)> send_fn)
PacketSender& packet_sender,
std::function<bool()> is_connection_established)
: log_prefix_(log_prefix),
options_(options),
timer_manager_(timer_manager),
@ -79,7 +80,7 @@ class TransmissionControlBlock : public Context {
peer_initial_tsn_(peer_initial_tsn),
tie_tag_(tie_tag),
is_connection_established_(std::move(is_connection_established)),
send_fn_(std::move(send_fn)),
packet_sender_(packet_sender),
rto_(options),
tx_error_counter_(log_prefix, options),
data_tracker_(log_prefix, delayed_ack_timer_.get(), peer_initial_tsn),
@ -124,7 +125,9 @@ class TransmissionControlBlock : public Context {
bool HasTooManyTxErrors() const override {
return tx_error_counter_.IsExhausted();
}
void Send(SctpPacket::Builder& builder) override { send_fn_(builder); }
void Send(SctpPacket::Builder& builder) override {
packet_sender_.Send(builder);
}
// Other accessors
DataTracker& data_tracker() { return data_tracker_; }
@ -202,7 +205,7 @@ class TransmissionControlBlock : public Context {
// Nonce, used to detect reconnections.
const TieTag tie_tag_;
const std::function<bool()> is_connection_established_;
const std::function<void(SctpPacket::Builder&)> send_fn_;
PacketSender& packet_sender_;
RetransmissionTimeout rto_;
RetransmissionErrorCounter tx_error_counter_;