Use MaybeWorkerThread in TaskQueuePacedSender

The pacer can thus run on the Worker thread or an owned TQ depending on field trial string "WebRTC-SendPacketsOnWorkerThread"

Bug: webrtc:14502
Change-Id: Ic74b92b21371cc62c7b2f62f039bc800dcceef8c
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/277622
Reviewed-by: Evan Shrubsole <eshr@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Reviewed-by: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#38301}
This commit is contained in:
Per Kjellander
2022-10-04 13:45:09 +02:00
committed by WebRTC LUCI CQ
parent a4cc19bd99
commit 9dc43057cf
4 changed files with 106 additions and 73 deletions

View File

@ -35,6 +35,7 @@ rtc_library("pacing") {
"../../api:function_view",
"../../api:sequence_checker",
"../../api/rtc_event_log",
"../../api/task_queue:pending_task_safety_flag",
"../../api/task_queue:task_queue",
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
@ -58,6 +59,7 @@ rtc_library("pacing") {
"../../system_wrappers:metrics",
"../rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",
"../utility:utility",
]
absl_deps = [
"//third_party/abseil-cpp/absl/memory",

View File

@ -14,6 +14,7 @@
#include <utility>
#include "absl/memory/memory.h"
#include "api/task_queue/pending_task_safety_flag.h"
#include "api/transport/network_types.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/field_trial_parser.h"
@ -72,9 +73,7 @@ TaskQueuePacedSender::TaskQueuePacedSender(
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {
task_queue_(field_trials, "TaskQueuePacedSender", task_queue_factory) {
RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
// There are multiple field trials that can affect burst. If multiple bursts
// are specified we pick the largest of the values.
@ -95,14 +94,14 @@ TaskQueuePacedSender::~TaskQueuePacedSender() {
// Post an immediate task to mark the queue as shutting down.
// The rtc::TaskQueue destructor will wait for pending tasks to
// complete before continuing.
task_queue_.PostTask([&]() {
task_queue_.RunOrPost([&]() {
RTC_DCHECK_RUN_ON(&task_queue_);
is_shutdown_ = true;
});
}
void TaskQueuePacedSender::EnsureStarted() {
task_queue_.PostTask([this]() {
task_queue_.RunOrPost([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
is_started_ = true;
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -111,7 +110,7 @@ void TaskQueuePacedSender::EnsureStarted() {
void TaskQueuePacedSender::CreateProbeClusters(
std::vector<ProbeClusterConfig> probe_cluster_configs) {
task_queue_.PostTask(
task_queue_.RunOrPost(
[this, probe_cluster_configs = std::move(probe_cluster_configs)]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.CreateProbeClusters(probe_cluster_configs);
@ -120,14 +119,14 @@ void TaskQueuePacedSender::CreateProbeClusters(
}
void TaskQueuePacedSender::Pause() {
task_queue_.PostTask([this]() {
task_queue_.RunOrPost([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Pause();
});
}
void TaskQueuePacedSender::Resume() {
task_queue_.PostTask([this]() {
task_queue_.RunOrPost([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Resume();
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -135,7 +134,7 @@ void TaskQueuePacedSender::Resume() {
}
void TaskQueuePacedSender::SetCongested(bool congested) {
task_queue_.PostTask([this, congested]() {
task_queue_.RunOrPost([this, congested]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetCongested(congested);
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -144,7 +143,7 @@ void TaskQueuePacedSender::SetCongested(bool congested) {
void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
DataRate padding_rate) {
task_queue_.PostTask([this, pacing_rate, padding_rate]() {
task_queue_.RunOrPost([this, pacing_rate, padding_rate]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -153,30 +152,31 @@ void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
task_queue_.PostTask([this, packets = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp());
task_queue_.TaskQueueForPost()->PostTask(task_queue_.MaybeSafeTask(
safety_.flag(), [this, packets = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets");
for (auto& packet : packets) {
TRACE_EVENT2(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::EnqueuePackets::Loop",
"sequence_number", packet->SequenceNumber(),
"rtp_timestamp", packet->Timestamp());
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(Timestamp::MinusInfinity());
});
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(Timestamp::MinusInfinity());
}));
}
void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
task_queue_.PostTask([this, account_for_audio]() {
task_queue_.RunOrPost([this, account_for_audio]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -184,7 +184,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
}
void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
task_queue_.RunOrPost([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
@ -193,7 +193,7 @@ void TaskQueuePacedSender::SetIncludeOverhead() {
}
void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
task_queue_.PostTask([this, overhead_per_packet]() {
task_queue_.RunOrPost([this, overhead_per_packet]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetTransportOverhead(overhead_per_packet);
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -201,7 +201,7 @@ void TaskQueuePacedSender::SetTransportOverhead(DataSize overhead_per_packet) {
}
void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
task_queue_.PostTask([this, limit]() {
task_queue_.RunOrPost([this, limit]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetQueueTimeLimit(limit);
MaybeProcessPackets(Timestamp::MinusInfinity());
@ -330,9 +330,11 @@ void TaskQueuePacedSender::MaybeProcessPackets(
}
}
task_queue_.Get()->PostDelayedTaskWithPrecision(
task_queue_.TaskQueueForDelayedTasks()->PostDelayedTaskWithPrecision(
precision,
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
task_queue_.MaybeSafeTask(
safety_.flag(),
[this, next_send_time]() { MaybeProcessPackets(next_send_time); }),
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)));
next_process_time_ = next_send_time;
}

View File

@ -17,7 +17,6 @@
#include <memory>
#include <vector>
#include "absl/base/attributes.h"
#include "absl/types/optional.h"
#include "api/field_trials_view.h"
#include "api/sequence_checker.h"
@ -28,9 +27,9 @@
#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/maybe_worker_thread.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -187,10 +186,13 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
// TODO(webrtc:14502): Remove stats_mutex_ when pacer runs on the worker
// thread.
mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);
rtc::TaskQueue task_queue_;
ScopedTaskSafety safety_;
MaybeWorkerThread task_queue_;
};
} // namespace webrtc
#endif // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_

View File

@ -188,10 +188,33 @@ std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
return packets;
}
TEST(TaskQueuePacedSenderTest, PacesPackets) {
constexpr char kSendPacketOnWorkerThreadFieldTrial[] =
"WebRTC-SendPacketsOnWorkerThread/Enabled/";
std::vector<std::string> ParameterizedFieldTrials() {
return {{""}, {kSendPacketOnWorkerThreadFieldTrial}};
}
bool UsingWorkerThread(absl::string_view field_trials) {
return field_trials.find(kSendPacketOnWorkerThreadFieldTrial) !=
std::string::npos;
}
class TaskQueuePacedSenderTest
: public ::testing::TestWithParam<std::string /*field_trials*/> {};
INSTANTIATE_TEST_SUITE_P(TaskQueuePacedSenderTest,
TaskQueuePacedSenderTest,
testing::ValuesIn(ParameterizedFieldTrials()),
[](const testing::TestParamInfo<std::string>& info) {
return UsingWorkerThread(info.param) ? "UsingWt"
: "OwnedTQ";
});
TEST_P(TaskQueuePacedSenderTest, PacesPackets) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
@ -199,6 +222,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
SequenceChecker sequence_checker;
pacer.SetPacingRates(
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero());
@ -216,6 +240,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
if (packets_sent == kPacketsToSend) {
end_time = time_controller.GetClock()->CurrentTime();
}
EXPECT_EQ(sequence_checker.IsCurrent(), UsingWorkerThread(GetParam()));
});
const Timestamp start_time = time_controller.GetClock()->CurrentTime();
@ -228,10 +253,10 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
}
TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
TEST_P(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
@ -265,7 +290,9 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
first_packet_time = time_controller.GetClock()->CurrentTime();
} else if (second_packet_time.IsInfinite()) {
second_packet_time = time_controller.GetClock()->CurrentTime();
pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
// Avoid invoke SetPacingRate in the context of sending a packet.
time_controller.GetMainThread()->PostTask(
[&] { pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero()); });
} else {
third_packet_time = time_controller.GetClock()->CurrentTime();
}
@ -280,10 +307,10 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
1.0);
}
TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
TEST_P(TaskQueuePacedSenderTest, SendsAudioImmediately) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
@ -312,11 +339,11 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
::testing::Mock::VerifyAndClearExpectations(&packet_router);
}
TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
TEST_P(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow,
@ -349,11 +376,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
::testing::Mock::VerifyAndClearExpectations(&packet_router);
}
TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
TEST_P(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow,
@ -386,9 +413,9 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
TEST_P(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
ScopedKeyValueConfig trials(
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
@ -457,11 +484,11 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
time_controller.AdvanceTime(TimeDelta::Millis(2));
}
TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
TEST_P(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
// Set min_probe_delta to be less than kMinSleepTime (1ms).
const TimeDelta kMinProbeDelta = TimeDelta::Micros(200);
ScopedKeyValueConfig trials(
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:200us/");
GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:200us/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
@ -519,13 +546,13 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
}
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
TEST_P(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(10);
const int kPacketBasedHoldback = 5;
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
@ -569,13 +596,13 @@ TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
time_controller.AdvanceTime(TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
TEST_P(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
const TimeDelta kFixedCoalescingWindow = TimeDelta::Millis(2);
const int kPacketBasedHoldback = 5;
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
kFixedCoalescingWindow, kPacketBasedHoldback);
@ -616,10 +643,10 @@ TEST(TaskQueuePacedSenderTest, FixedHoldBackHasPriorityOverPackets) {
time_controller.AdvanceTime(kFixedCoalescingWindow);
}
TEST(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
TEST_P(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
// Set a low `min_probe_delta` to let probing finish during send loop.
ScopedKeyValueConfig trials(
"WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
GetParam() + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
@ -666,11 +693,11 @@ TEST(TaskQueuePacedSenderTest, ProbingStopDuringSendLoop) {
time_controller.AdvanceTime(kPacketsPacedTime + TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, Stats) {
TEST_P(TaskQueuePacedSenderTest, Stats) {
static constexpr Timestamp kStartTime = Timestamp::Millis(1234);
GlobalSimulatedTimeController time_controller(kStartTime);
MockPacketRouter packet_router;
ScopedKeyValueConfig trials;
ScopedKeyValueConfig trials(GetParam());
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router, trials,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime,
@ -737,19 +764,19 @@ TEST(TaskQueuePacedSenderTest, Stats) {
EXPECT_TRUE(pacer.ExpectedQueueTime().IsZero());
}
// TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
// pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) {
test::ScopedKeyValueConfig experiments(
"WebRTC-SlackedTaskQueuePacedSender/Disabled/");
ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Disabled/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
TaskQueueWithFakePrecisionFactory task_queue_factory(
time_controller.GetTaskQueueFactory());
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
experiments, &task_queue_factory,
PacingController::kMinSleepTime,
TaskQueuePacedSender::kNoPacketHoldback);
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Send enough packets (covering one second) that pacing is triggered, i.e.
// delayed tasks being scheduled.
@ -786,19 +813,19 @@ TEST(TaskQueuePacedSenderTest, HighPrecisionPacingWhenSlackIsDisabled) {
EXPECT_GT(task_queue_factory.delayed_high_precision_count(), 0);
}
// TODO(webrtc:14502): Rewrite these tests if the functionality is needed if
// pacing is done on the worker thread.
TEST(TaskQueuePacedSenderTest, LowPrecisionPacingWhenSlackIsEnabled) {
test::ScopedKeyValueConfig experiments(
"WebRTC-SlackedTaskQueuePacedSender/Enabled/");
ScopedKeyValueConfig trials("WebRTC-SlackedTaskQueuePacedSender/Enabled/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
TaskQueueWithFakePrecisionFactory task_queue_factory(
time_controller.GetTaskQueueFactory());
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
experiments, &task_queue_factory,
PacingController::kMinSleepTime,
TaskQueuePacedSender::kNoPacketHoldback);
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router, trials, &task_queue_factory,
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Send enough packets (covering one second) that pacing is triggered, i.e.
// delayed tasks being scheduled.