Wire up new PacedSender code path.

This CL makes the new code path for paced sending functionally complete.
By default, the field trial WebRTC-Pacer-ReferencePackets is Enabled,
meaning that there is no behavior change unless the field trial is
forced to Disabled. This is done in tests, and can be done on the
command line for manual testing.

Bug: webrtc:10633
Change-Id: I0d66c94ef83b5847dee437a785018f09ba3f828d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/144050
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Åsa Persson <asapersson@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28497}
This commit is contained in:
Erik Språng
2019-07-05 16:53:43 +02:00
committed by Commit Bot
parent 668ce0c7fa
commit f6468d2569
13 changed files with 904 additions and 236 deletions

View File

@ -28,6 +28,7 @@ rtc_static_library("pacing") {
deps = [
":interval_budget",
"..:module_api",
"../../api:function_view",
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
"../../api/transport:webrtc_key_value_config",

View File

@ -12,6 +12,7 @@
#include <algorithm>
#include <utility>
#include <vector>
#include "absl/memory/memory.h"
#include "logging/rtc_event_log/rtc_event_log.h"
@ -97,7 +98,9 @@ PacedSender::PacedSender(Clock* clock,
packets_(clock->TimeInMicroseconds()),
packet_counter_(0),
queue_time_limit(kMaxQueueLengthMs),
account_for_audio_(false) {
account_for_audio_(false),
legacy_packet_referencing_(
!IsDisabled(*field_trials_, "WebRTC-Pacer-LegacyPacketReferencing")) {
if (!drain_large_queues_) {
RTC_LOG(LS_WARNING) << "Pacer queues will not be drained,"
"pushback experiment must be enabled.";
@ -328,10 +331,21 @@ void PacedSender::Process() {
int64_t now_us = clock_->TimeInMicroseconds();
int64_t elapsed_time_ms = UpdateTimeAndGetElapsedMs(now_us);
if (ShouldSendKeepalive(now_us)) {
critsect_.Leave();
size_t bytes_sent = packet_router_->TimeToSendPadding(1, PacedPacketInfo());
critsect_.Enter();
OnPaddingSent(bytes_sent);
if (legacy_packet_referencing_) {
critsect_.Leave();
size_t bytes_sent =
packet_router_->TimeToSendPadding(1, PacedPacketInfo());
critsect_.Enter();
OnPaddingSent(bytes_sent);
} else {
critsect_.Leave();
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_router_->GeneratePadding(1);
critsect_.Enter();
for (auto& packet : keepalive_packets) {
EnqueuePacket(std::move(packet));
}
}
}
if (paused_)
@ -364,35 +378,60 @@ void PacedSender::Process() {
bool is_probing = prober_.IsProbing();
PacedPacketInfo pacing_info;
size_t bytes_sent = 0;
size_t recommended_probe_size = 0;
absl::optional<size_t> recommended_probe_size;
if (is_probing) {
pacing_info = prober_.CurrentCluster();
recommended_probe_size = prober_.RecommendedMinProbeSize();
}
size_t bytes_sent = 0;
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!packets_.Empty() && !paused_) {
while (!paused_) {
auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr)
if (packet == nullptr) {
// No packet available to send, check if we should send padding.
if (!legacy_packet_referencing_) {
size_t padding_bytes_to_add =
PaddingBytesToAdd(recommended_probe_size, bytes_sent);
if (padding_bytes_to_add > 0) {
critsect_.Leave();
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_router_->GeneratePadding(padding_bytes_to_add);
critsect_.Enter();
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
const bool owned_rtp_packet = rtp_packet != nullptr;
critsect_.Leave();
RtpPacketSendResult success;
if (rtp_packet != nullptr) {
critsect_.Leave();
packet_router_->SendPacket(std::move(rtp_packet), pacing_info);
critsect_.Enter();
success = RtpPacketSendResult::kSuccess;
} else {
critsect_.Leave();
success = packet_router_->TimeToSendPacket(
packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
packet->is_retransmission(), pacing_info);
critsect_.Enter();
}
critsect_.Enter();
if (success == RtpPacketSendResult::kSuccess ||
success == RtpPacketSendResult::kPacketNotFound) {
// Packet sent or invalid packet, remove it from queue.
@ -400,7 +439,7 @@ void PacedSender::Process() {
bytes_sent += packet->size_in_bytes();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (is_probing && bytes_sent > recommended_probe_size)
if (recommended_probe_size && bytes_sent > *recommended_probe_size)
break;
} else if (owned_rtp_packet) {
// Send failed, but we can't put it back in the queue, remove it without
@ -414,16 +453,17 @@ void PacedSender::Process() {
}
}
if (packets_.Empty() && !Congested()) {
if (legacy_packet_referencing_ && packets_.Empty() && !Congested()) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
int padding_needed =
static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
: padding_budget_.bytes_remaining());
int padding_needed = static_cast<int>(
recommended_probe_size ? (*recommended_probe_size - bytes_sent)
: padding_budget_.bytes_remaining());
if (padding_needed > 0) {
size_t padding_sent = 0;
critsect_.Leave();
size_t padding_sent =
padding_sent =
packet_router_->TimeToSendPadding(padding_needed, pacing_info);
critsect_.Enter();
bytes_sent += padding_sent;
@ -431,6 +471,7 @@ void PacedSender::Process() {
}
}
}
if (is_probing) {
probing_send_failure_ = bytes_sent == 0;
if (!probing_send_failure_)
@ -444,8 +485,41 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread;
}
size_t PacedSender::PaddingBytesToAdd(
absl::optional<size_t> recommended_probe_size,
size_t bytes_sent) {
if (!packets_.Empty()) {
// Actual payload available, no need to add padding.
return 0;
}
if (Congested()) {
// Don't add padding if congested, even if requested for probing.
return 0;
}
if (packet_counter_ == 0) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
return 0;
}
if (recommended_probe_size) {
if (*recommended_probe_size > bytes_sent) {
return *recommended_probe_size - bytes_sent;
}
return 0;
}
return padding_budget_.bytes_remaining();
}
RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
const PacedPacketInfo& pacing_info) {
if (packets_.Empty()) {
return nullptr;
}
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.

View File

@ -17,6 +17,7 @@
#include <memory>
#include "absl/types/optional.h"
#include "api/function_view.h"
#include "api/transport/field_trial_based_config.h"
#include "api/transport/network_types.h"
#include "api/transport/webrtc_key_value_config.h"
@ -135,6 +136,10 @@ class PacedSender : public Module, public RtpPacketPacer {
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t PaddingBytesToAdd(absl::optional<size_t> recommended_probe_size,
size_t bytes_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
const PacedPacketInfo& pacing_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
@ -195,6 +200,11 @@ class PacedSender : public Module, public RtpPacketPacer {
int64_t queue_time_limit RTC_GUARDED_BY(critsect_);
bool account_for_audio_ RTC_GUARDED_BY(critsect_);
// If true, PacedSender should only reference packets as in legacy mode.
// If false, PacedSender may have direct ownership of RtpPacketToSend objects.
// Defaults to true, will be changed to default false soon.
const bool legacy_packet_referencing_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACED_SENDER_H_

View File

@ -146,6 +146,12 @@ RtpRtcp* PacketRouter::FindRtpModule(uint32_t ssrc) {
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
rtc::CritScope cs(&modules_crit_);
// With the new pacer code path, transport sequence numbers are only set here,
// on the pacer thread. Therefore we don't need atomics/synchronization.
if (packet->IsExtensionReserved<TransportSequenceNumber>() &&
packet->SetExtension<TransportSequenceNumber>(transport_seq_)) {
++transport_seq_;
}
for (auto* rtp_module : rtp_send_modules_) {
if (rtp_module->TrySendPacket(packet.get(), cluster_info)) {
const bool can_send_padding =
@ -200,7 +206,8 @@ size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send,
return total_bytes_sent;
}
void PacketRouter::GeneratePadding(size_t target_size_bytes) {
std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
size_t target_size_bytes) {
rtc::CritScope cs(&modules_crit_);
// First try on the last rtp module to have sent media. This increases the
// the chance that any payload based padding will be useful as it will be
@ -212,17 +219,17 @@ void PacketRouter::GeneratePadding(size_t target_size_bytes) {
RTC_DCHECK(std::find(rtp_send_modules_.begin(), rtp_send_modules_.end(),
last_send_module_) != rtp_send_modules_.end());
RTC_DCHECK(last_send_module_->HasBweExtensions());
last_send_module_->GeneratePadding(target_size_bytes);
return;
return last_send_module_->GeneratePadding(target_size_bytes);
}
// Rtp modules are ordered by which stream can most benefit from padding.
for (RtpRtcp* rtp_module : rtp_send_modules_) {
if (rtp_module->SendingMedia() && rtp_module->HasBweExtensions()) {
rtp_module->GeneratePadding(target_size_bytes);
return;
return rtp_module->GeneratePadding(target_size_bytes);
}
}
return {};
}
void PacketRouter::SetTransportWideSequenceNumber(uint16_t sequence_number) {

View File

@ -65,7 +65,8 @@ class PacketRouter : public TransportSequenceNumberAllocator,
virtual size_t TimeToSendPadding(size_t bytes,
const PacedPacketInfo& packet_info);
virtual void GeneratePadding(size_t target_size_bytes);
virtual std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
size_t target_size_bytes);
void SetTransportWideSequenceNumber(uint16_t sequence_number);
uint16_t AllocateSequenceNumber() override;

View File

@ -10,9 +10,12 @@
#include <cstddef>
#include <cstdint>
#include <utility>
#include "absl/memory/memory.h"
#include "api/units/time_delta.h"
#include "modules/pacing/packet_router.h"
#include "modules/rtp_rtcp/include/rtp_header_extension_map.h"
#include "modules/rtp_rtcp/mocks/mock_rtp_rtcp.h"
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/checks.h"
@ -36,6 +39,7 @@ using ::testing::Field;
using ::testing::Gt;
using ::testing::Le;
using ::testing::NiceMock;
using ::testing::Property;
using ::testing::Return;
using ::testing::ReturnPointee;
using ::testing::SaveArg;
@ -296,9 +300,15 @@ TEST(PacketRouterTest, GeneratePaddingPicksCorrectModule) {
packet_router.AddSendRtpModule(&rtp_2, false);
const size_t kPaddingSize = 123;
const size_t kExpectedPaddingPackets = 1;
EXPECT_CALL(rtp_1, GeneratePadding(_)).Times(0);
EXPECT_CALL(rtp_2, GeneratePadding(kPaddingSize)).Times(1);
packet_router.GeneratePadding(kPaddingSize);
EXPECT_CALL(rtp_2, GeneratePadding(kPaddingSize))
.WillOnce([&](size_t padding_size) {
return std::vector<std::unique_ptr<RtpPacketToSend>>(
kExpectedPaddingPackets);
});
auto generated_padding = packet_router.GeneratePadding(kPaddingSize);
EXPECT_EQ(generated_padding.size(), kExpectedPaddingPackets);
packet_router.RemoveSendRtpModule(&rtp_1);
packet_router.RemoveSendRtpModule(&rtp_2);
@ -938,4 +948,86 @@ TEST(PacketRouterRembTest, ReceiveModuleTakesOverWhenLastSendModuleRemoved) {
packet_router.RemoveReceiveRtpModule(&receive_module);
}
TEST(PacketRouterTest, SendPacketWithoutTransportSequenceNumbers) {
PacketRouter packet_router;
NiceMock<MockRtpRtcp> rtp_1;
packet_router.AddSendRtpModule(&rtp_1, false);
const uint16_t kSsrc1 = 1234;
ON_CALL(rtp_1, SendingMedia).WillByDefault(Return(true));
ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
// Send a packet without TransportSequenceNumber extension registered,
// packets sent should not have the extension set.
RtpHeaderExtensionMap extension_manager;
auto packet = absl::make_unique<RtpPacketToSend>(&extension_manager);
packet->SetSsrc(kSsrc1);
EXPECT_CALL(
rtp_1,
TrySendPacket(
Property(&RtpPacketToSend::HasExtension<TransportSequenceNumber>,
false),
_))
.WillOnce(Return(true));
packet_router.SendPacket(std::move(packet), PacedPacketInfo());
packet_router.RemoveSendRtpModule(&rtp_1);
}
TEST(PacketRouterTest, SendPacketAssignsTransportSequenceNumbers) {
PacketRouter packet_router;
NiceMock<MockRtpRtcp> rtp_1;
NiceMock<MockRtpRtcp> rtp_2;
packet_router.AddSendRtpModule(&rtp_1, false);
packet_router.AddSendRtpModule(&rtp_2, false);
const uint16_t kSsrc1 = 1234;
const uint16_t kSsrc2 = 2345;
ON_CALL(rtp_1, SendingMedia).WillByDefault(Return(true));
ON_CALL(rtp_1, SSRC).WillByDefault(Return(kSsrc1));
ON_CALL(rtp_2, SendingMedia).WillByDefault(Return(true));
ON_CALL(rtp_2, SSRC).WillByDefault(Return(kSsrc2));
RtpHeaderExtensionMap extension_manager;
const int kTransportSequenceNumberExtensionId = 1;
extension_manager.Register(kRtpExtensionTransportSequenceNumber,
kTransportSequenceNumberExtensionId);
uint16_t transport_sequence_number = 0;
auto packet = absl::make_unique<RtpPacketToSend>(&extension_manager);
EXPECT_TRUE(packet->ReserveExtension<TransportSequenceNumber>());
packet->SetSsrc(kSsrc1);
EXPECT_CALL(
rtp_1,
TrySendPacket(
Property(&RtpPacketToSend::GetExtension<TransportSequenceNumber>,
transport_sequence_number),
_))
.WillOnce(Return(true));
packet_router.SendPacket(std::move(packet), PacedPacketInfo());
++transport_sequence_number;
packet = absl::make_unique<RtpPacketToSend>(&extension_manager);
EXPECT_TRUE(packet->ReserveExtension<TransportSequenceNumber>());
packet->SetSsrc(kSsrc2);
// There will be a failed attempt to send on kSsrc1 before trying
// the correct RTP module.
EXPECT_CALL(rtp_1, TrySendPacket).WillOnce(Return(false));
EXPECT_CALL(
rtp_2,
TrySendPacket(
Property(&RtpPacketToSend::GetExtension<TransportSequenceNumber>,
transport_sequence_number),
_))
.WillOnce(Return(true));
packet_router.SendPacket(std::move(packet), PacedPacketInfo());
packet_router.RemoveSendRtpModule(&rtp_1);
packet_router.RemoveSendRtpModule(&rtp_2);
}
} // namespace webrtc