diff --git a/modules/pacing/bitrate_prober.cc b/modules/pacing/bitrate_prober.cc index 4192df956b..8dc89e4a86 100644 --- a/modules/pacing/bitrate_prober.cc +++ b/modules/pacing/bitrate_prober.cc @@ -28,7 +28,7 @@ namespace { // we have a min probe packet size of 200 bytes. constexpr size_t kMinProbePacketSize = 200; -constexpr int64_t kProbeClusterTimeoutMs = 5000; +constexpr TimeDelta kProbeClusterTimeout = TimeDelta::Seconds<5>(); } // namespace @@ -55,7 +55,7 @@ BitrateProber::~BitrateProber() { BitrateProber::BitrateProber(const WebRtcKeyValueConfig& field_trials) : probing_state_(ProbingState::kDisabled), - next_probe_time_ms_(-1), + next_probe_time_(Timestamp::PlusInfinity()), total_probe_count_(0), total_failed_probe_count_(0), config_(&field_trials) { @@ -85,32 +85,31 @@ void BitrateProber::OnIncomingPacket(size_t packet_size) { packet_size >= std::min(RecommendedMinProbeSize(), kMinProbePacketSize)) { // Send next probe right away. - next_probe_time_ms_ = -1; + next_probe_time_ = Timestamp::MinusInfinity(); probing_state_ = ProbingState::kActive; } } -void BitrateProber::CreateProbeCluster(int bitrate_bps, - int64_t now_ms, +void BitrateProber::CreateProbeCluster(DataRate bitrate, + Timestamp now, int cluster_id) { RTC_DCHECK(probing_state_ != ProbingState::kDisabled); - RTC_DCHECK_GT(bitrate_bps, 0); + RTC_DCHECK_GT(bitrate, DataRate::Zero()); total_probe_count_++; while (!clusters_.empty() && - now_ms - clusters_.front().time_created_ms > kProbeClusterTimeoutMs) { + now - clusters_.front().created_at > kProbeClusterTimeout) { clusters_.pop(); total_failed_probe_count_++; } ProbeCluster cluster; - cluster.time_created_ms = now_ms; + cluster.created_at = now; cluster.pace_info.probe_cluster_min_probes = config_.min_probe_packets_sent; cluster.pace_info.probe_cluster_min_bytes = - static_cast(static_cast(bitrate_bps) * - config_.min_probe_duration->ms() / 8000); + (bitrate * config_.min_probe_duration.Get()).bytes(); RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0); - cluster.pace_info.send_bitrate_bps = bitrate_bps; + cluster.pace_info.send_bitrate_bps = bitrate.bps(); cluster.pace_info.probe_cluster_id = cluster_id; clusters_.push(cluster); @@ -124,23 +123,21 @@ void BitrateProber::CreateProbeCluster(int bitrate_bps, probing_state_ = ProbingState::kInactive; } -int BitrateProber::TimeUntilNextProbe(int64_t now_ms) { +Timestamp BitrateProber::NextProbeTime(Timestamp now) const { // Probing is not active or probing is already complete. - if (probing_state_ != ProbingState::kActive || clusters_.empty()) - return -1; - - int time_until_probe_ms = 0; - if (next_probe_time_ms_ >= 0) { - time_until_probe_ms = next_probe_time_ms_ - now_ms; - if (time_until_probe_ms < -config_.max_probe_delay->ms()) { - RTC_DLOG(LS_WARNING) << "Probe delay too high" - << " (next_ms:" << next_probe_time_ms_ - << ", now_ms: " << now_ms << ")"; - return -1; - } + if (probing_state_ != ProbingState::kActive || clusters_.empty()) { + return Timestamp::PlusInfinity(); } - return std::max(time_until_probe_ms, 0); + if (next_probe_time_.IsFinite() && + now - next_probe_time_ > config_.max_probe_delay.Get()) { + RTC_DLOG(LS_WARNING) << "Probe delay too high" + << " (next_ms:" << next_probe_time_.ms() + << ", now_ms: " << now.ms() << ")"; + return Timestamp::PlusInfinity(); + } + + return next_probe_time_; } PacedPacketInfo BitrateProber::CurrentCluster() const { @@ -160,19 +157,19 @@ size_t BitrateProber::RecommendedMinProbeSize() const { config_.min_probe_delta->ms() / (8 * 1000); } -void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) { +void BitrateProber::ProbeSent(Timestamp now, size_t bytes) { RTC_DCHECK(probing_state_ == ProbingState::kActive); RTC_DCHECK_GT(bytes, 0); if (!clusters_.empty()) { ProbeCluster* cluster = &clusters_.front(); if (cluster->sent_probes == 0) { - RTC_DCHECK_EQ(cluster->time_started_ms, -1); - cluster->time_started_ms = now_ms; + RTC_DCHECK(cluster->started_at.IsInfinite()); + cluster->started_at = now; } cluster->sent_bytes += static_cast(bytes); cluster->sent_probes += 1; - next_probe_time_ms_ = GetNextProbeTime(*cluster); + next_probe_time_ = CalculateNextProbeTime(*cluster); if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes && cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) { RTC_HISTOGRAM_COUNTS_100000("WebRTC.BWE.Probing.ProbeClusterSizeInBytes", @@ -180,7 +177,7 @@ void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) { RTC_HISTOGRAM_COUNTS_100("WebRTC.BWE.Probing.ProbesPerCluster", cluster->sent_probes); RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.Probing.TimePerProbeCluster", - now_ms - cluster->time_started_ms); + (now - cluster->started_at).ms()); clusters_.pop(); } @@ -189,16 +186,17 @@ void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) { } } -int64_t BitrateProber::GetNextProbeTime(const ProbeCluster& cluster) { +Timestamp BitrateProber::CalculateNextProbeTime( + const ProbeCluster& cluster) const { RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0); - RTC_CHECK_GE(cluster.time_started_ms, 0); + RTC_CHECK(cluster.started_at.IsFinite()); // Compute the time delta from the cluster start to ensure probe bitrate stays // close to the target bitrate. Result is in milliseconds. - int64_t delta_ms = - (8000ll * cluster.sent_bytes + cluster.pace_info.send_bitrate_bps / 2) / - cluster.pace_info.send_bitrate_bps; - return cluster.time_started_ms + delta_ms; + DataSize sent_bytes = DataSize::bytes(cluster.sent_bytes); + DataRate send_bitrate = DataRate::bps(cluster.pace_info.send_bitrate_bps); + TimeDelta delta = sent_bytes / send_bitrate; + return cluster.started_at + delta; } } // namespace webrtc diff --git a/modules/pacing/bitrate_prober.h b/modules/pacing/bitrate_prober.h index 0a9f961d87..ec234e8f5f 100644 --- a/modules/pacing/bitrate_prober.h +++ b/modules/pacing/bitrate_prober.h @@ -61,11 +61,12 @@ class BitrateProber { // Create a cluster used to probe for |bitrate_bps| with |num_probes| number // of probes. - void CreateProbeCluster(int bitrate_bps, int64_t now_ms, int cluster_id); + void CreateProbeCluster(DataRate bitrate, Timestamp now, int cluster_id); - // Returns the number of milliseconds until the next probe should be sent to - // get accurate probing. - int TimeUntilNextProbe(int64_t now_ms); + // Returns the at which the next probe should be sent to get accurate probing. + // If probing is not desired at this time, Timestamp::PlusInfinity() will be + // returned. + Timestamp NextProbeTime(Timestamp now) const; // Information about the current probing cluster. PacedPacketInfo CurrentCluster() const; @@ -78,7 +79,7 @@ class BitrateProber { // multiple packets per probe, this call would be made at the end of sending // the last packet in probe. |probe_size| is the total size of all packets // in probe. - void ProbeSent(int64_t now_ms, size_t probe_size); + void ProbeSent(Timestamp now, size_t probe_size); private: enum class ProbingState { @@ -101,12 +102,12 @@ class BitrateProber { int sent_probes = 0; int sent_bytes = 0; - int64_t time_created_ms = -1; - int64_t time_started_ms = -1; + Timestamp created_at = Timestamp::MinusInfinity(); + Timestamp started_at = Timestamp::MinusInfinity(); int retries = 0; }; - int64_t GetNextProbeTime(const ProbeCluster& cluster); + Timestamp CalculateNextProbeTime(const ProbeCluster& cluster) const; ProbingState probing_state_; @@ -116,7 +117,7 @@ class BitrateProber { std::queue clusters_; // Time the next probe should be sent when in kActive state. - int64_t next_probe_time_ms_; + Timestamp next_probe_time_; int total_probe_count_; int total_failed_probe_count_; diff --git a/modules/pacing/bitrate_prober_unittest.cc b/modules/pacing/bitrate_prober_unittest.cc index c907cdda29..6f3624f4ab 100644 --- a/modules/pacing/bitrate_prober_unittest.cc +++ b/modules/pacing/bitrate_prober_unittest.cc @@ -10,6 +10,8 @@ #include "modules/pacing/bitrate_prober.h" +#include + #include "test/gtest.h" namespace webrtc { @@ -19,17 +21,18 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) { BitrateProber prober(config); EXPECT_FALSE(prober.IsProbing()); - int64_t now_ms = 0; - EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + Timestamp now = Timestamp::ms(0); + const Timestamp start_time = now; + EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity()); - const int kTestBitrate1 = 900000; - const int kTestBitrate2 = 1800000; + const DataRate kTestBitrate1 = DataRate::kbps(900); + const DataRate kTestBitrate2 = DataRate::kbps(1800); const int kClusterSize = 5; const int kProbeSize = 1000; - const int kMinProbeDurationMs = 15; + const TimeDelta kMinProbeDuration = TimeDelta::ms(15); - prober.CreateProbeCluster(kTestBitrate1, now_ms, 0); - prober.CreateProbeCluster(kTestBitrate2, now_ms, 1); + prober.CreateProbeCluster(kTestBitrate1, now, 0); + prober.CreateProbeCluster(kTestBitrate2, now, 1); EXPECT_FALSE(prober.IsProbing()); prober.OnIncomingPacket(kProbeSize); @@ -37,39 +40,40 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) { EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id); // First packet should probe as soon as possible. - EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + EXPECT_EQ(Timestamp::MinusInfinity(), prober.NextProbeTime(now)); for (int i = 0; i < kClusterSize; ++i) { - now_ms += prober.TimeUntilNextProbe(now_ms); - EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + now = std::max(now, prober.NextProbeTime(now)); + EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now))); EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id); - prober.ProbeSent(now_ms, kProbeSize); + prober.ProbeSent(now, kProbeSize); } - EXPECT_GE(now_ms, kMinProbeDurationMs); + EXPECT_GE(now - start_time, kMinProbeDuration); // Verify that the actual bitrate is withing 10% of the target. - double bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / now_ms; + DataRate bitrate = + DataSize::bytes(kProbeSize * (kClusterSize - 1)) / (now - start_time); EXPECT_GT(bitrate, kTestBitrate1 * 0.9); EXPECT_LT(bitrate, kTestBitrate1 * 1.1); - now_ms += prober.TimeUntilNextProbe(now_ms); - int64_t probe2_started = now_ms; + now = std::max(now, prober.NextProbeTime(now)); + Timestamp probe2_started = now; for (int i = 0; i < kClusterSize; ++i) { - now_ms += prober.TimeUntilNextProbe(now_ms); - EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); + now = std::max(now, prober.NextProbeTime(now)); + EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now))); EXPECT_EQ(1, prober.CurrentCluster().probe_cluster_id); - prober.ProbeSent(now_ms, kProbeSize); + prober.ProbeSent(now, kProbeSize); } // Verify that the actual bitrate is withing 10% of the target. - int duration = now_ms - probe2_started; - EXPECT_GE(duration, kMinProbeDurationMs); - bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / duration; + TimeDelta duration = now - probe2_started; + EXPECT_GE(duration, kMinProbeDuration); + bitrate = DataSize::bytes(kProbeSize * (kClusterSize - 1)) / duration; EXPECT_GT(bitrate, kTestBitrate2 * 0.9); EXPECT_LT(bitrate, kTestBitrate2 * 1.1); - EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity()); EXPECT_FALSE(prober.IsProbing()); } @@ -77,23 +81,23 @@ TEST(BitrateProberTest, DoesntProbeWithoutRecentPackets) { const FieldTrialBasedConfig config; BitrateProber prober(config); - int64_t now_ms = 0; - EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + Timestamp now = Timestamp::Zero(); + EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity()); - prober.CreateProbeCluster(900000, now_ms, 0); + prober.CreateProbeCluster(DataRate::kbps(900), now, 0); EXPECT_FALSE(prober.IsProbing()); prober.OnIncomingPacket(1000); EXPECT_TRUE(prober.IsProbing()); - EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms)); - prober.ProbeSent(now_ms, 1000); + EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now))); + prober.ProbeSent(now, 1000); // Let time pass, no large enough packets put into prober. - now_ms += 6000; - EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + now += TimeDelta::seconds(6); + EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity()); // Check that legacy behaviour where prober is reset in TimeUntilNextProbe is // no longer there. Probes are no longer retried if they are timed out. prober.OnIncomingPacket(1000); - EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms)); + EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity()); } TEST(BitrateProberTest, DoesntInitializeProbingForSmallPackets) { @@ -111,11 +115,12 @@ TEST(BitrateProberTest, VerifyProbeSizeOnHighBitrate) { const FieldTrialBasedConfig config; BitrateProber prober(config); - constexpr unsigned kHighBitrateBps = 10000000; // 10 Mbps + const DataRate kHighBitrate = DataRate::kbps(10000); // 10 Mbps - prober.CreateProbeCluster(kHighBitrateBps, 0, /*cluster_id=*/0); + prober.CreateProbeCluster(kHighBitrate, Timestamp::ms(0), /*cluster_id=*/0); // Probe size should ensure a minimum of 1 ms interval. - EXPECT_GT(prober.RecommendedMinProbeSize(), kHighBitrateBps / 8000); + EXPECT_GT(prober.RecommendedMinProbeSize(), + (kHighBitrate * TimeDelta::ms(1)).bytes()); } TEST(BitrateProberTest, MinumumNumberOfProbingPackets) { @@ -123,14 +128,15 @@ TEST(BitrateProberTest, MinumumNumberOfProbingPackets) { BitrateProber prober(config); // Even when probing at a low bitrate we expect a minimum number // of packets to be sent. - constexpr int kBitrateBps = 100000; // 100 kbps - constexpr int kPacketSizeBytes = 1000; + const DataRate kBitrate = DataRate::kbps(100); + const int kPacketSizeBytes = 1000; - prober.CreateProbeCluster(kBitrateBps, 0, 0); + Timestamp now = Timestamp::ms(0); + prober.CreateProbeCluster(kBitrate, now, 0); prober.OnIncomingPacket(kPacketSizeBytes); for (int i = 0; i < 5; ++i) { EXPECT_TRUE(prober.IsProbing()); - prober.ProbeSent(0, kPacketSizeBytes); + prober.ProbeSent(now, kPacketSizeBytes); } EXPECT_FALSE(prober.IsProbing()); @@ -139,16 +145,17 @@ TEST(BitrateProberTest, MinumumNumberOfProbingPackets) { TEST(BitrateProberTest, ScaleBytesUsedForProbing) { const FieldTrialBasedConfig config; BitrateProber prober(config); - constexpr int kBitrateBps = 10000000; // 10 Mbps - constexpr int kPacketSizeBytes = 1000; - constexpr int kExpectedBytesSent = kBitrateBps * 15 / 8000; + const DataRate kBitrate = DataRate::kbps(10000); // 10 Mbps. + const int kPacketSizeBytes = 1000; + const int kExpectedBytesSent = (kBitrate * TimeDelta::ms(15)).bytes(); - prober.CreateProbeCluster(kBitrateBps, 0, /*cluster_id=*/0); + Timestamp now = Timestamp::ms(0); + prober.CreateProbeCluster(kBitrate, now, /*cluster_id=*/0); prober.OnIncomingPacket(kPacketSizeBytes); int bytes_sent = 0; while (bytes_sent < kExpectedBytesSent) { ASSERT_TRUE(prober.IsProbing()); - prober.ProbeSent(0, kPacketSizeBytes); + prober.ProbeSent(now, kPacketSizeBytes); bytes_sent += kPacketSizeBytes; } @@ -158,16 +165,17 @@ TEST(BitrateProberTest, ScaleBytesUsedForProbing) { TEST(BitrateProberTest, HighBitrateProbing) { const FieldTrialBasedConfig config; BitrateProber prober(config); - constexpr int kBitrateBps = 1000000000; // 1 Gbps. - constexpr int kPacketSizeBytes = 1000; - constexpr int kExpectedBytesSent = (kBitrateBps / 8000) * 15; + const DataRate kBitrate = DataRate::kbps(1000000); // 1 Gbps. + const int kPacketSizeBytes = 1000; + const int kExpectedBytesSent = (kBitrate * TimeDelta::ms(15)).bytes(); - prober.CreateProbeCluster(kBitrateBps, 0, 0); + Timestamp now = Timestamp::ms(0); + prober.CreateProbeCluster(kBitrate, now, 0); prober.OnIncomingPacket(kPacketSizeBytes); int bytes_sent = 0; while (bytes_sent < kExpectedBytesSent) { ASSERT_TRUE(prober.IsProbing()); - prober.ProbeSent(0, kPacketSizeBytes); + prober.ProbeSent(now, kPacketSizeBytes); bytes_sent += kPacketSizeBytes; } @@ -177,28 +185,28 @@ TEST(BitrateProberTest, HighBitrateProbing) { TEST(BitrateProberTest, ProbeClusterTimeout) { const FieldTrialBasedConfig config; BitrateProber prober(config); - constexpr int kBitrateBps = 300000; // 300 kbps - constexpr int kSmallPacketSize = 20; + const DataRate kBitrate = DataRate::kbps(300); + const int kSmallPacketSize = 20; // Expecting two probe clusters of 5 packets each. - constexpr int kExpectedBytesSent = 20 * 2 * 5; - constexpr int64_t kTimeoutMs = 5000; + const int kExpectedBytesSent = 20 * 2 * 5; + const TimeDelta kTimeout = TimeDelta::ms(5000); - int64_t now_ms = 0; - prober.CreateProbeCluster(kBitrateBps, now_ms, /*cluster_id=*/0); + Timestamp now = Timestamp::ms(0); + prober.CreateProbeCluster(kBitrate, now, /*cluster_id=*/0); prober.OnIncomingPacket(kSmallPacketSize); EXPECT_FALSE(prober.IsProbing()); - now_ms += kTimeoutMs; - prober.CreateProbeCluster(kBitrateBps / 10, now_ms, /*cluster_id=*/1); + now += kTimeout; + prober.CreateProbeCluster(kBitrate / 10, now, /*cluster_id=*/1); prober.OnIncomingPacket(kSmallPacketSize); EXPECT_FALSE(prober.IsProbing()); - now_ms += 1; - prober.CreateProbeCluster(kBitrateBps / 10, now_ms, /*cluster_id=*/2); + now += TimeDelta::ms(1); + prober.CreateProbeCluster(kBitrate / 10, now, /*cluster_id=*/2); prober.OnIncomingPacket(kSmallPacketSize); EXPECT_TRUE(prober.IsProbing()); int bytes_sent = 0; while (bytes_sent < kExpectedBytesSent) { ASSERT_TRUE(prober.IsProbing()); - prober.ProbeSent(0, kSmallPacketSize); + prober.ProbeSent(now, kSmallPacketSize); bytes_sent += kSmallPacketSize; } diff --git a/modules/pacing/paced_sender.cc b/modules/pacing/paced_sender.cc index 0579b8e292..a961f5b21b 100644 --- a/modules/pacing/paced_sender.cc +++ b/modules/pacing/paced_sender.cc @@ -36,6 +36,7 @@ PacedSender::PacedSender(Clock* clock, static_cast(this), event_log, field_trials), + clock_(clock), packet_router_(packet_router), process_thread_(process_thread) { if (process_thread_) @@ -136,9 +137,9 @@ int64_t PacedSender::TimeUntilNextProcess() { .ms(); } - auto next_probe = pacing_controller_.TimeUntilNextProbe(); - if (next_probe) { - return next_probe->ms(); + Timestamp next_probe = pacing_controller_.NextProbeTime(); + if (next_probe != Timestamp::PlusInfinity()) { + return std::max(TimeDelta::Zero(), next_probe - clock_->CurrentTime()).ms(); } const TimeDelta min_packet_limit = TimeDelta::ms(5); diff --git a/modules/pacing/paced_sender.h b/modules/pacing/paced_sender.h index f28cb63e50..3539c53619 100644 --- a/modules/pacing/paced_sender.h +++ b/modules/pacing/paced_sender.h @@ -165,6 +165,7 @@ class PacedSender : public Module, rtc::CriticalSection critsect_; PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); + Clock* const clock_; PacketRouter* const packet_router_; ProcessThread* const process_thread_; }; diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index b603bce394..4a5eadd86b 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -123,7 +123,7 @@ PacingController::PacingController(Clock* clock, PacingController::~PacingController() = default; void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) { - prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id); + prober_.CreateProbeCluster(bitrate, CurrentTime(), cluster_id); } void PacingController::Pause() { @@ -233,10 +233,10 @@ TimeDelta PacingController::OldestPacketWaitTime() const { void PacingController::EnqueuePacketInternal( std::unique_ptr packet, int priority) { - Timestamp now = CurrentTime(); prober_.OnIncomingPacket(packet->payload_size()); // TODO(sprang): Make sure tests respect this, replace with DCHECK. + Timestamp now = CurrentTime(); if (packet->capture_time_ms() < 0) { packet->set_capture_time_ms(now.ms()); } @@ -272,19 +272,26 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const { return false; } -absl::optional PacingController::TimeUntilNextProbe() { +Timestamp PacingController::NextProbeTime() { if (!prober_.IsProbing()) { - return absl::nullopt; + return Timestamp::PlusInfinity(); } - TimeDelta time_delta = - TimeDelta::ms(prober_.TimeUntilNextProbe(CurrentTime().ms())); - if (time_delta > TimeDelta::Zero() || - (time_delta == TimeDelta::Zero() && !probing_send_failure_)) { - return time_delta; + Timestamp now = CurrentTime(); + Timestamp probe_time = prober_.NextProbeTime(now); + if (probe_time.IsInfinite()) { + return probe_time; } - return absl::nullopt; + if (probe_time > now) { + return probe_time; + } + + if (probing_send_failure_ || now - probe_time > TimeDelta::ms(1)) { + return Timestamp::PlusInfinity(); + } + + return probe_time; } TimeDelta PacingController::TimeElapsedSinceLastProcess() const { @@ -400,7 +407,7 @@ void PacingController::ProcessPackets() { if (is_probing) { probing_send_failure_ = data_sent == DataSize::Zero(); if (!probing_send_failure_) { - prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes()); + prober_.ProbeSent(CurrentTime(), data_sent.bytes()); } } } diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index d0e68a9a71..6f3f9fb487 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -118,10 +118,11 @@ class PacingController { // effect. void SetProbingEnabled(bool enabled); - // Time until next probe should be sent. If this value is set, it should be + // Time at which next probe should be sent. If this value is set, it should be // respected - i.e. don't call ProcessPackets() before this specified time as // that can have unintended side effects. - absl::optional TimeUntilNextProbe(); + // If no scheduled probe, Timestamp::PlusInifinity() is returned. + Timestamp NextProbeTime(); // Time since ProcessPackets() was last executed. TimeDelta TimeElapsedSinceLastProcess() const; diff --git a/modules/pacing/pacing_controller_unittest.cc b/modules/pacing/pacing_controller_unittest.cc index caec575233..361be0dc3f 100644 --- a/modules/pacing/pacing_controller_unittest.cc +++ b/modules/pacing/pacing_controller_unittest.cc @@ -152,11 +152,19 @@ class PacingControllerProbing : public PacingController::PacketSender { std::vector> GeneratePadding( DataSize target_size) override { + // From RTPSender: + // Max in the RFC 3550 is 255 bytes, we limit it to be modulus 32 for SRTP. + const DataSize kMaxPadding = DataSize::bytes(224); + std::vector> packets; - packets.emplace_back(std::make_unique(nullptr)); - packets.back()->SetPadding(target_size.bytes()); - packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding); - padding_sent_ += target_size.bytes(); + while (target_size > DataSize::Zero()) { + DataSize padding_size = std::min(kMaxPadding, target_size); + packets.emplace_back(std::make_unique(nullptr)); + packets.back()->SetPadding(padding_size.bytes()); + packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding); + padding_sent_ += padding_size.bytes(); + target_size -= padding_size; + } return packets; } @@ -246,9 +254,9 @@ class PacingControllerTest : public ::testing::Test { TimeDelta::Zero()); } - auto next_probe = pacer_->TimeUntilNextProbe(); - if (next_probe) { - return *next_probe; + Timestamp next_probe = pacer_->NextProbeTime(); + if (next_probe != Timestamp::PlusInfinity()) { + return std::max(TimeDelta::Zero(), next_probe - clock_.CurrentTime()); } const TimeDelta min_packet_limit = TimeDelta::ms(5); @@ -1105,6 +1113,65 @@ TEST_F(PacingControllerTest, ProbingWithInsertedPackets) { kSecondClusterRate.bps(), kProbingErrorMargin.bps()); } +TEST_F(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) { + const size_t kPacketSize = 1200; + const int kInitialBitrateBps = 300000; + uint32_t ssrc = 12346; + uint16_t sequence_number = 1234; + + PacingControllerProbing packet_sender; + pacer_ = std::make_unique(&clock_, &packet_sender, nullptr, + nullptr); + pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier), + DataRate::Zero()); + + for (int i = 0; i < 10; ++i) { + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + } + while (pacer_->QueueSizePackets() > 0) { + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + } + + // Probe at a very high rate. + pacer_->CreateProbeCluster(DataRate::kbps(10000), // 10 Mbps. + /*cluster_id=*/3); + // We need one packet to start the probe. + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + const int packets_sent_before_probe = packet_sender.packets_sent(); + clock_.AdvanceTime(TimeUntilNextProcess()); + pacer_->ProcessPackets(); + EXPECT_EQ(packet_sender.packets_sent(), packets_sent_before_probe + 1); + + // Figure out how long between probe packets. + Timestamp start_time = clock_.CurrentTime(); + clock_.AdvanceTime(TimeUntilNextProcess()); + TimeDelta time_between_probes = clock_.CurrentTime() - start_time; + // Advance that distance again + 1ms. + clock_.AdvanceTime(time_between_probes); + + // Send second probe packet. + Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++, + clock_.TimeInMilliseconds(), kPacketSize); + pacer_->ProcessPackets(); + EXPECT_EQ(packet_sender.packets_sent(), packets_sent_before_probe + 2); + + // We're exactly where we should be for the next probe. + EXPECT_TRUE(pacer_->NextProbeTime().IsFinite()); + + // Advance to within one millisecond past where the next probe should be sent, + // will still indicate "process immediately". + clock_.AdvanceTime(TimeDelta::us(500)); + EXPECT_TRUE(pacer_->NextProbeTime().IsFinite()); + + // We've gone more than one millisecond past the time for the next probe + // packet, it will dropped. + clock_.AdvanceTime(TimeDelta::ms(1)); + EXPECT_EQ(pacer_->NextProbeTime(), Timestamp::PlusInfinity()); +} + TEST_F(PacingControllerTest, ProbingWithPaddingSupport) { const size_t kPacketSize = 1200; const int kInitialBitrateBps = 300000;