diff --git a/modules/pacing/BUILD.gn b/modules/pacing/BUILD.gn index 1a4e9a5512..cabcd9300b 100644 --- a/modules/pacing/BUILD.gn +++ b/modules/pacing/BUILD.gn @@ -93,6 +93,7 @@ if (rtc_include_tests) { deps = [ ":interval_budget", ":pacing", + "../../api/transport:network_control", "../../api/units:data_rate", "../../api/units:time_delta", "../../modules/utility:mock_process_thread", diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index 33780e001c..107316d4e1 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -39,6 +39,11 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2); // time. Applies only to periodic mode. constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30); +// Allow probes to be processed slightly ahead of inteded send time. Currently +// set to 1ms as this is intended to allow times be rounded down to the nearest +// millisecond. +constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1); + constexpr int kFirstPriority = 0; bool IsDisabled(const WebRtcKeyValueConfig& field_trials, @@ -306,7 +311,9 @@ void PacingController::EnqueuePacketInternal( } TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { - if (last_process_time_.IsMinusInfinity()) { + // If no previous processing, or last process was "in the future" because of + // early probe processing, then there is no elapsed time to add budget for. + if (last_process_time_.IsMinusInfinity() || now < last_process_time_) { return TimeDelta::Zero(); } RTC_DCHECK_GE(now, last_process_time_); @@ -400,10 +407,13 @@ void PacingController::ProcessPackets() { Timestamp target_send_time = now; if (mode_ == ProcessMode::kDynamic) { target_send_time = NextSendTime(); + TimeDelta early_execute_margin = + prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero(); if (target_send_time.IsMinusInfinity()) { target_send_time = now; - } else if (now < target_send_time) { + } else if (now < target_send_time - early_execute_margin) { // We are too early, but if queue is empty still allow draining some debt. + // Probing is allowed to be sent up to kMinSleepTime early. TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now); UpdateBudgetWithElapsedTime(elapsed_time); return; @@ -582,7 +592,7 @@ void PacingController::ProcessPackets() { // If we are currently probing, we need to stop the send loop when we have // reached the send target. - if (is_probing && data_sent > recommended_probe_size) { + if (is_probing && data_sent >= recommended_probe_size) { break; } @@ -703,8 +713,9 @@ void PacingController::OnPaddingSent(DataSize data_sent) { if (data_sent > DataSize::Zero()) { UpdateBudgetWithSentData(data_sent); } - last_send_time_ = CurrentTime(); - last_process_time_ = CurrentTime(); + Timestamp now = CurrentTime(); + last_send_time_ = now; + last_process_time_ = now; } void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) { diff --git a/modules/pacing/task_queue_paced_sender.cc b/modules/pacing/task_queue_paced_sender.cc index db748f30b4..eb8b11bb6e 100644 --- a/modules/pacing/task_queue_paced_sender.cc +++ b/modules/pacing/task_queue_paced_sender.cc @@ -218,20 +218,30 @@ void TaskQueuePacedSender::MaybeProcessPackets( next_process_time = pacing_controller_.NextSendTime(); } - const TimeDelta min_sleep = pacing_controller_.IsProbing() - ? PacingController::kMinSleepTime - : hold_back_window_; - next_process_time = std::max(now + min_sleep, next_process_time); + absl::optional time_to_next_process; + if (pacing_controller_.IsProbing() && + next_process_time != next_process_time_) { + // If we're probing and there isn't already a wakeup scheduled for the next + // process time, always post a task and just round sleep time down to + // nearest millisecond. + time_to_next_process = + std::max(TimeDelta::Zero(), + (next_process_time - now).RoundDownTo(TimeDelta::Millis(1))); + } else if (next_process_time_.IsMinusInfinity() || + next_process_time <= next_process_time_ - hold_back_window_) { + // Schedule a new task since there is none currently scheduled + // (|next_process_time_| is infinite), or the new process time is at least + // one holdback window earlier than whatever is currently scheduled. + time_to_next_process = std::max(next_process_time - now, hold_back_window_); + } - TimeDelta sleep_time = next_process_time - now; - if (next_process_time_.IsMinusInfinity() || - next_process_time <= - next_process_time_ - PacingController::kMinSleepTime) { + if (time_to_next_process) { + // Set a new scheduled process time and post a delayed task. next_process_time_ = next_process_time; task_queue_.PostDelayedTask( [this, next_process_time]() { MaybeProcessPackets(next_process_time); }, - sleep_time.ms()); + time_to_next_process->ms()); } MaybeUpdateStats(false); diff --git a/modules/pacing/task_queue_paced_sender_unittest.cc b/modules/pacing/task_queue_paced_sender_unittest.cc index b02f387768..7fe21d1f2e 100644 --- a/modules/pacing/task_queue_paced_sender_unittest.cc +++ b/modules/pacing/task_queue_paced_sender_unittest.cc @@ -10,12 +10,14 @@ #include "modules/pacing/task_queue_paced_sender.h" +#include #include #include #include #include #include +#include "api/transport/network_types.h" #include "modules/pacing/packet_router.h" #include "modules/utility/include/mock/mock_process_thread.h" #include "test/field_trial.h" @@ -63,13 +65,12 @@ class StatsUpdateObserver { class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { public: - TaskQueuePacedSenderForTest( - Clock* clock, - PacketRouter* packet_router, - RtcEventLog* event_log, - const WebRtcKeyValueConfig* field_trials, - TaskQueueFactory* task_queue_factory, - TimeDelta hold_back_window = PacingController::kMinSleepTime) + TaskQueuePacedSenderForTest(Clock* clock, + PacketRouter* packet_router, + RtcEventLog* event_log, + const WebRtcKeyValueConfig* field_trials, + TaskQueueFactory* task_queue_factory, + TimeDelta hold_back_window) : TaskQueuePacedSender(clock, packet_router, event_log, @@ -84,6 +85,27 @@ class TaskQueuePacedSenderForTest : public TaskQueuePacedSender { size_t num_stats_updates_ = 0; }; + +std::vector> GeneratePadding( + DataSize target_size) { + // 224 bytes is the max padding size for plain padding packets generated by + // RTPSender::GeneratePadding(). + const DataSize kMaxPaddingPacketSize = DataSize::Bytes(224); + DataSize padding_generated = DataSize::Zero(); + std::vector> padding_packets; + while (padding_generated < target_size) { + DataSize packet_size = + std::min(target_size - padding_generated, kMaxPaddingPacketSize); + padding_generated += packet_size; + auto padding_packet = + std::make_unique(/*extensions=*/nullptr); + padding_packet->set_packet_type(RtpPacketMediaType::kPadding); + padding_packet->SetPadding(packet_size.bytes()); + padding_packets.push_back(std::move(padding_packet)); + } + return padding_packets; +} + } // namespace namespace test { @@ -406,5 +428,125 @@ namespace test { EXPECT_EQ(pacer.num_stats_updates_, num_expected_stats_updates); } + TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) { + ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/"); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); + + // Set rates so one packet adds 4ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + EXPECT_CALL(packet_router, GeneratePadding(_)) + .WillRepeatedly( + [](DataSize target_size) { return GeneratePadding(target_size); }); + + // Enqueue two packets, only the first is sent immediately and the next + // will be scheduled for sending in 4ms. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 2)); + const int kNotAProbe = PacedPacketInfo::kNotAProbe; + EXPECT_CALL( + packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kNotAProbe))); + // Advance to less than 3ms before next packet send time. + time_controller.AdvanceTime(TimeDelta::Micros(1001)); + + // Trigger a probe at 4x the current pacing rate and insert the number of + // packets the probe needs. + const DataRate kProbeRate = 2 * kPacingDataRate; + const int kProbeClusterId = 1; + pacer.CreateProbeCluster(kProbeRate, kProbeClusterId); + + // Expected size for each probe in a cluster is twice the expected bits + // sent during min_probe_delta. + const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2); + const DataSize kProbeSize = kProbeRate * kProbeTimeDelta; + const size_t kNumPacketsInProbe = + (kProbeSize + kPacketSize - DataSize::Bytes(1)) / kPacketSize; + EXPECT_CALL( + packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(kNumPacketsInProbe); + + pacer.EnqueuePackets( + GeneratePackets(RtpPacketMediaType::kVideo, kNumPacketsInProbe)); + time_controller.AdvanceTime(TimeDelta::Zero()); + + // The pacer should have scheduled the next probe to be sent in + // kProbeTimeDelta. That there was existing scheduled call less than + // PacingController::kMinSleepTime before this should not matter. + + EXPECT_CALL( + packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(AtLeast(1)); + time_controller.AdvanceTime(TimeDelta::Millis(2)); + } + + TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) { + // Set min_probe_delta to be less than kMinSleepTime (1ms). + const TimeDelta kMinProbeDelta = TimeDelta::Micros(100); + ScopedFieldTrials trials( + "WebRTC-Bwe-ProbingBehavior/min_probe_delta:100us/"); + GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234)); + MockPacketRouter packet_router; + TaskQueuePacedSenderForTest pacer( + time_controller.GetClock(), &packet_router, + /*event_log=*/nullptr, + /*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(), + PacingController::kMinSleepTime); + + // Set rates so one packet adds 4ms of buffer level. + const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize); + const TimeDelta kPacketPacingTime = TimeDelta::Millis(4); + const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime; + pacer.SetPacingRates(kPacingDataRate, /*padding_rate=*/DataRate::Zero()); + EXPECT_CALL(packet_router, FetchFec).WillRepeatedly([]() { + return std::vector>(); + }); + EXPECT_CALL(packet_router, GeneratePadding) + .WillRepeatedly( + [](DataSize target_size) { return GeneratePadding(target_size); }); + + // Set a high probe rate. + const int kProbeClusterId = 1; + DataRate kProbingRate = kPacingDataRate * 10; + pacer.CreateProbeCluster(kProbingRate, kProbeClusterId); + + // Advance time less than PacingController::kMinSleepTime, probing packets + // for the first millisecond should be sent immediately. Min delta between + // probes is 2x 100us, meaning 4 times per ms we will get least one call to + // SendPacket(). + DataSize data_sent = DataSize::Zero(); + EXPECT_CALL( + packet_router, + SendPacket(_, ::testing::Field(&PacedPacketInfo::probe_cluster_id, + kProbeClusterId))) + .Times(AtLeast(4)) + .WillRepeatedly([&](std::unique_ptr packet, + const PacedPacketInfo&) { + data_sent += + DataSize::Bytes(packet->payload_size() + packet->padding_size()); + }); + + // Add one packet to kickstart probing, the rest will be padding packets. + pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 1)); + time_controller.AdvanceTime(kMinProbeDelta); + + // Verify the amount of probing data sent. + EXPECT_EQ(data_sent, kProbingRate * TimeDelta::Millis(1)); + } } // namespace test } // namespace webrtc