diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index fad90186b7..56eed92682 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -32,11 +32,16 @@ PacedSender::PacedSender(Clock* clock, RtcEventLog* event_log, const WebRtcKeyValueConfig* field_trials, ProcessThread* process_thread) - : pacing_controller_(clock, + : process_mode_((field_trials != nullptr && + field_trials->Lookup("WebRTC-Pacer-DynamicProcess") + .find("Enabled") == 0) + ? PacingController::ProcessMode::kDynamic + : PacingController::ProcessMode::kPeriodic), + pacing_controller_(clock, static_cast(this), event_log, field_trials, - PacingController::ProcessMode::kPeriodic), + process_mode_), clock_(clock), packet_router_(packet_router), process_thread_(process_thread) { @@ -45,8 +50,9 @@ PacedSender::PacedSender(Clock* clock, } PacedSender::~PacedSender() { - if (process_thread_) + if (process_thread_) { process_thread_->DeRegisterModule(&module_proxy_); + } } void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { @@ -62,8 +68,9 @@ void PacedSender::Pause() { // Tell the process thread to call our TimeUntilNextProcess() method to get // a new (longer) estimate for when to call Process(). - if (process_thread_) + if (process_thread_) { process_thread_->WakeUp(&module_proxy_); + } } void PacedSender::Resume() { @@ -74,31 +81,44 @@ void PacedSender::Resume() { // Tell the process thread to call our TimeUntilNextProcess() method to // refresh the estimate for when to call Process(). - if (process_thread_) + if (process_thread_) { process_thread_->WakeUp(&module_proxy_); + } } void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { - rtc::CritScope cs(&critsect_); - pacing_controller_.SetCongestionWindow(congestion_window_size); + { + rtc::CritScope cs(&critsect_); + pacing_controller_.SetCongestionWindow(congestion_window_size); + } + MaybeWakupProcessThread(); } void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { - rtc::CritScope cs(&critsect_); - pacing_controller_.UpdateOutstandingData(outstanding_data); + { + rtc::CritScope cs(&critsect_); + pacing_controller_.UpdateOutstandingData(outstanding_data); + } + MaybeWakupProcessThread(); } void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { - rtc::CritScope cs(&critsect_); - pacing_controller_.SetPacingRates(pacing_rate, padding_rate); + { + rtc::CritScope cs(&critsect_); + pacing_controller_.SetPacingRates(pacing_rate, padding_rate); + } + MaybeWakupProcessThread(); } void PacedSender::EnqueuePackets( std::vector> packets) { - rtc::CritScope cs(&critsect_); - for (auto& packet : packets) { - pacing_controller_.EnqueuePacket(std::move(packet)); + { + rtc::CritScope cs(&critsect_); + for (auto& packet : packets) { + pacing_controller_.EnqueuePacket(std::move(packet)); + } } + MaybeWakupProcessThread(); } void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { @@ -144,9 +164,21 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) { RTC_DCHECK(!process_thread || process_thread == process_thread_); } +void PacedSender::MaybeWakupProcessThread() { + // Tell the process thread to call our TimeUntilNextProcess() method to get + // a new time for when to call Process(). + if (process_thread_ && + process_mode_ == PacingController::ProcessMode::kDynamic) { + process_thread_->WakeUp(&module_proxy_); + } +} + void PacedSender::SetQueueTimeLimit(TimeDelta limit) { - rtc::CritScope cs(&critsect_); - pacing_controller_.SetQueueTimeLimit(limit); + { + rtc::CritScope cs(&critsect_); + pacing_controller_.SetQueueTimeLimit(limit); + } + MaybeWakupProcessThread(); } void PacedSender::SendRtpPacket(std::unique_ptr packet, diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index 3539c53619..06a6c26e16 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -134,9 +134,10 @@ class PacedSender : public Module, // Called when the prober is associated with a process thread. void ProcessThreadAttached(ProcessThread* process_thread) override; - private: - // Methods implementing PacedSenderController:PacketSender. + // In dynamic process mode, refreshes the next process time. + void MaybeWakupProcessThread(); + // Methods implementing PacedSenderController:PacketSender. void SendRtpPacket(std::unique_ptr packet, const PacedPacketInfo& cluster_info) override RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); @@ -163,6 +164,7 @@ class PacedSender : public Module, } module_proxy_{this}; rtc::CriticalSection critsect_; + const PacingController::ProcessMode process_mode_; PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); Clock* const clock_; diff --git a/modules/pacing/paced_sender_unittest.cc b/modules/pacing/paced_sender_unittest.cc index feb6c072ed..23f1d6014e 100644 --- a/modules/pacing/paced_sender_unittest.cc +++ b/modules/pacing/paced_sender_unittest.cc @@ -39,7 +39,6 @@ constexpr size_t kDefaultPacketSize = 234; namespace webrtc { namespace test { - // Mock callback implementing the raw api. class MockCallback : public PacketRouter { public: @@ -51,69 +50,88 @@ class MockCallback : public PacketRouter { std::vector>(size_t target_size_bytes)); }; -std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { - auto packet = std::make_unique(nullptr); - packet->set_packet_type(type); - switch (type) { - case RtpPacketToSend::Type::kAudio: - packet->SetSsrc(kAudioSsrc); - break; - case RtpPacketToSend::Type::kVideo: - packet->SetSsrc(kVideoSsrc); - break; - case RtpPacketToSend::Type::kRetransmission: - case RtpPacketToSend::Type::kPadding: - packet->SetSsrc(kVideoRtxSsrc); - break; - case RtpPacketToSend::Type::kForwardErrorCorrection: - packet->SetSsrc(kFlexFecSsrc); - break; +class PacedSenderTest + : public ::testing::TestWithParam { + public: + PacedSenderTest() : clock_(0), paced_module_(nullptr) {} + + void SetUp() override { + EXPECT_CALL(process_thread_, RegisterModule) + .WillOnce(SaveArg<0>(&paced_module_)); + + pacer_ = std::make_unique(&clock_, &callback_, nullptr, + nullptr, &process_thread_); + EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1); } - packet->SetPayloadSize(kDefaultPacketSize); - return packet; -} + protected: + std::unique_ptr BuildRtpPacket(RtpPacketToSend::Type type) { + auto packet = std::make_unique(nullptr); + packet->set_packet_type(type); + switch (type) { + case RtpPacketToSend::Type::kAudio: + packet->SetSsrc(kAudioSsrc); + break; + case RtpPacketToSend::Type::kVideo: + packet->SetSsrc(kVideoSsrc); + break; + case RtpPacketToSend::Type::kRetransmission: + case RtpPacketToSend::Type::kPadding: + packet->SetSsrc(kVideoRtxSsrc); + break; + case RtpPacketToSend::Type::kForwardErrorCorrection: + packet->SetSsrc(kFlexFecSsrc); + break; + } -TEST(PacedSenderTest, PacesPackets) { - SimulatedClock clock(0); - MockCallback callback; - MockProcessThread process_thread; - Module* paced_module = nullptr; - EXPECT_CALL(process_thread, RegisterModule(_, _)) - .WillOnce(SaveArg<0>(&paced_module)); - PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread); - EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1); + packet->SetPayloadSize(kDefaultPacketSize); + return packet; + } + SimulatedClock clock_; + MockCallback callback_; + MockProcessThread process_thread_; + Module* paced_module_; + std::unique_ptr pacer_; +}; + +TEST_P(PacedSenderTest, PacesPackets) { // Insert a number of packets, covering one second. static constexpr size_t kPacketsToSend = 42; - pacer.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend), - DataRate::Zero()); + pacer_->SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend), + DataRate::Zero()); std::vector> packets; for (size_t i = 0; i < kPacketsToSend; ++i) { packets.emplace_back(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); } - pacer.EnqueuePackets(std::move(packets)); + pacer_->EnqueuePackets(std::move(packets)); // Expect all of them to be sent. size_t packets_sent = 0; - clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); - EXPECT_CALL(callback, SendPacket) + clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess()); + EXPECT_CALL(callback_, SendPacket) .WillRepeatedly( [&](std::unique_ptr packet, const PacedPacketInfo& cluster_info) { ++packets_sent; }); - const Timestamp start_time = clock.CurrentTime(); + const Timestamp start_time = clock_.CurrentTime(); while (packets_sent < kPacketsToSend) { - clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); - paced_module->Process(); + clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess()); + paced_module_->Process(); } // Packets should be sent over a period of close to 1s. Expect a little lower // than this since initial probing is a bit quicker. - TimeDelta duration = clock.CurrentTime() - start_time; + TimeDelta duration = clock_.CurrentTime() - start_time; EXPECT_GT(duration, TimeDelta::ms(900)); } +INSTANTIATE_TEST_SUITE_P( + WithAndWithoutDynamicProcess, + PacedSenderTest, + ::testing::Values(PacingController::ProcessMode::kPeriodic, + PacingController::ProcessMode::kDynamic)); + } // namespace test } // namespace webrtc