Reland "Refactors BitrateProber with unit types and absolute probe time."

This is a reland of 739a5b3692880cb6b41ae620fb9e755c39b044b1

Patchset 1 is the original CL, patchset 3 includes a fix

Original change's description:
> Refactors BitrateProber with unit types and absolute probe time.
>
> Using unit types improves readability and some conversion in PacedSender
> can be removed.
>
> TimeUntilNextProbe() is replaced by NextProbeTime(), so returning an
> absolute time rather than a delta. This fits better with the upcoming
> TaskQueue based pacer, and is also what is already stored internally
> in BitrateProber.
>
> Bug: webrtc:10809
> Change-Id: I5a4e289d2b53e99d3c0a2f4b36a966dba759d5cf
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158743
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Reviewed-by: Sebastian Jansson <srte@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#29670}

Bug: webrtc:10809
Change-Id: I033193c78474fdd82c109fdab0a8f09a05f7b30e
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/158841
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29688}
This commit is contained in:
Erik Språng
2019-11-05 11:21:48 +01:00
committed by Commit Bot
parent 1983458981
commit b210eeb812
8 changed files with 210 additions and 126 deletions

View File

@ -28,7 +28,7 @@ namespace {
// we have a min probe packet size of 200 bytes.
constexpr size_t kMinProbePacketSize = 200;
constexpr int64_t kProbeClusterTimeoutMs = 5000;
constexpr TimeDelta kProbeClusterTimeout = TimeDelta::Seconds<5>();
} // namespace
@ -55,7 +55,7 @@ BitrateProber::~BitrateProber() {
BitrateProber::BitrateProber(const WebRtcKeyValueConfig& field_trials)
: probing_state_(ProbingState::kDisabled),
next_probe_time_ms_(-1),
next_probe_time_(Timestamp::PlusInfinity()),
total_probe_count_(0),
total_failed_probe_count_(0),
config_(&field_trials) {
@ -85,32 +85,31 @@ void BitrateProber::OnIncomingPacket(size_t packet_size) {
packet_size >=
std::min<size_t>(RecommendedMinProbeSize(), kMinProbePacketSize)) {
// Send next probe right away.
next_probe_time_ms_ = -1;
next_probe_time_ = Timestamp::MinusInfinity();
probing_state_ = ProbingState::kActive;
}
}
void BitrateProber::CreateProbeCluster(int bitrate_bps,
int64_t now_ms,
void BitrateProber::CreateProbeCluster(DataRate bitrate,
Timestamp now,
int cluster_id) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
RTC_DCHECK_GT(bitrate_bps, 0);
RTC_DCHECK_GT(bitrate, DataRate::Zero());
total_probe_count_++;
while (!clusters_.empty() &&
now_ms - clusters_.front().time_created_ms > kProbeClusterTimeoutMs) {
now - clusters_.front().created_at > kProbeClusterTimeout) {
clusters_.pop();
total_failed_probe_count_++;
}
ProbeCluster cluster;
cluster.time_created_ms = now_ms;
cluster.created_at = now;
cluster.pace_info.probe_cluster_min_probes = config_.min_probe_packets_sent;
cluster.pace_info.probe_cluster_min_bytes =
static_cast<int32_t>(static_cast<int64_t>(bitrate_bps) *
config_.min_probe_duration->ms() / 8000);
(bitrate * config_.min_probe_duration.Get()).bytes();
RTC_DCHECK_GE(cluster.pace_info.probe_cluster_min_bytes, 0);
cluster.pace_info.send_bitrate_bps = bitrate_bps;
cluster.pace_info.send_bitrate_bps = bitrate.bps();
cluster.pace_info.probe_cluster_id = cluster_id;
clusters_.push(cluster);
@ -124,23 +123,21 @@ void BitrateProber::CreateProbeCluster(int bitrate_bps,
probing_state_ = ProbingState::kInactive;
}
int BitrateProber::TimeUntilNextProbe(int64_t now_ms) {
Timestamp BitrateProber::NextProbeTime(Timestamp now) const {
// Probing is not active or probing is already complete.
if (probing_state_ != ProbingState::kActive || clusters_.empty())
return -1;
int time_until_probe_ms = 0;
if (next_probe_time_ms_ >= 0) {
time_until_probe_ms = next_probe_time_ms_ - now_ms;
if (time_until_probe_ms < -config_.max_probe_delay->ms()) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
<< " (next_ms:" << next_probe_time_ms_
<< ", now_ms: " << now_ms << ")";
return -1;
}
if (probing_state_ != ProbingState::kActive || clusters_.empty()) {
return Timestamp::PlusInfinity();
}
return std::max(time_until_probe_ms, 0);
if (next_probe_time_.IsFinite() &&
now - next_probe_time_ > config_.max_probe_delay.Get()) {
RTC_DLOG(LS_WARNING) << "Probe delay too high"
<< " (next_ms:" << next_probe_time_.ms()
<< ", now_ms: " << now.ms() << ")";
return Timestamp::PlusInfinity();
}
return next_probe_time_;
}
PacedPacketInfo BitrateProber::CurrentCluster() const {
@ -160,19 +157,19 @@ size_t BitrateProber::RecommendedMinProbeSize() const {
config_.min_probe_delta->ms() / (8 * 1000);
}
void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
void BitrateProber::ProbeSent(Timestamp now, size_t bytes) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK_GT(bytes, 0);
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
if (cluster->sent_probes == 0) {
RTC_DCHECK_EQ(cluster->time_started_ms, -1);
cluster->time_started_ms = now_ms;
RTC_DCHECK(cluster->started_at.IsInfinite());
cluster->started_at = now;
}
cluster->sent_bytes += static_cast<int>(bytes);
cluster->sent_probes += 1;
next_probe_time_ms_ = GetNextProbeTime(*cluster);
next_probe_time_ = CalculateNextProbeTime(*cluster);
if (cluster->sent_bytes >= cluster->pace_info.probe_cluster_min_bytes &&
cluster->sent_probes >= cluster->pace_info.probe_cluster_min_probes) {
RTC_HISTOGRAM_COUNTS_100000("WebRTC.BWE.Probing.ProbeClusterSizeInBytes",
@ -180,7 +177,7 @@ void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
RTC_HISTOGRAM_COUNTS_100("WebRTC.BWE.Probing.ProbesPerCluster",
cluster->sent_probes);
RTC_HISTOGRAM_COUNTS_10000("WebRTC.BWE.Probing.TimePerProbeCluster",
now_ms - cluster->time_started_ms);
(now - cluster->started_at).ms());
clusters_.pop();
}
@ -189,16 +186,17 @@ void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
}
}
int64_t BitrateProber::GetNextProbeTime(const ProbeCluster& cluster) {
Timestamp BitrateProber::CalculateNextProbeTime(
const ProbeCluster& cluster) const {
RTC_CHECK_GT(cluster.pace_info.send_bitrate_bps, 0);
RTC_CHECK_GE(cluster.time_started_ms, 0);
RTC_CHECK(cluster.started_at.IsFinite());
// Compute the time delta from the cluster start to ensure probe bitrate stays
// close to the target bitrate. Result is in milliseconds.
int64_t delta_ms =
(8000ll * cluster.sent_bytes + cluster.pace_info.send_bitrate_bps / 2) /
cluster.pace_info.send_bitrate_bps;
return cluster.time_started_ms + delta_ms;
DataSize sent_bytes = DataSize::bytes(cluster.sent_bytes);
DataRate send_bitrate = DataRate::bps(cluster.pace_info.send_bitrate_bps);
TimeDelta delta = sent_bytes / send_bitrate;
return cluster.started_at + delta;
}
} // namespace webrtc

View File

@ -61,11 +61,12 @@ class BitrateProber {
// Create a cluster used to probe for |bitrate_bps| with |num_probes| number
// of probes.
void CreateProbeCluster(int bitrate_bps, int64_t now_ms, int cluster_id);
void CreateProbeCluster(DataRate bitrate, Timestamp now, int cluster_id);
// Returns the number of milliseconds until the next probe should be sent to
// get accurate probing.
int TimeUntilNextProbe(int64_t now_ms);
// Returns the at which the next probe should be sent to get accurate probing.
// If probing is not desired at this time, Timestamp::PlusInfinity() will be
// returned.
Timestamp NextProbeTime(Timestamp now) const;
// Information about the current probing cluster.
PacedPacketInfo CurrentCluster() const;
@ -78,7 +79,7 @@ class BitrateProber {
// multiple packets per probe, this call would be made at the end of sending
// the last packet in probe. |probe_size| is the total size of all packets
// in probe.
void ProbeSent(int64_t now_ms, size_t probe_size);
void ProbeSent(Timestamp now, size_t probe_size);
private:
enum class ProbingState {
@ -101,12 +102,12 @@ class BitrateProber {
int sent_probes = 0;
int sent_bytes = 0;
int64_t time_created_ms = -1;
int64_t time_started_ms = -1;
Timestamp created_at = Timestamp::MinusInfinity();
Timestamp started_at = Timestamp::MinusInfinity();
int retries = 0;
};
int64_t GetNextProbeTime(const ProbeCluster& cluster);
Timestamp CalculateNextProbeTime(const ProbeCluster& cluster) const;
ProbingState probing_state_;
@ -116,7 +117,7 @@ class BitrateProber {
std::queue<ProbeCluster> clusters_;
// Time the next probe should be sent when in kActive state.
int64_t next_probe_time_ms_;
Timestamp next_probe_time_;
int total_probe_count_;
int total_failed_probe_count_;

View File

@ -10,6 +10,8 @@
#include "modules/pacing/bitrate_prober.h"
#include <algorithm>
#include "test/gtest.h"
namespace webrtc {
@ -19,17 +21,18 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
BitrateProber prober(config);
EXPECT_FALSE(prober.IsProbing());
int64_t now_ms = 0;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
Timestamp now = Timestamp::ms(0);
const Timestamp start_time = now;
EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity());
const int kTestBitrate1 = 900000;
const int kTestBitrate2 = 1800000;
const DataRate kTestBitrate1 = DataRate::kbps(900);
const DataRate kTestBitrate2 = DataRate::kbps(1800);
const int kClusterSize = 5;
const int kProbeSize = 1000;
const int kMinProbeDurationMs = 15;
const TimeDelta kMinProbeDuration = TimeDelta::ms(15);
prober.CreateProbeCluster(kTestBitrate1, now_ms, 0);
prober.CreateProbeCluster(kTestBitrate2, now_ms, 1);
prober.CreateProbeCluster(kTestBitrate1, now, 0);
prober.CreateProbeCluster(kTestBitrate2, now, 1);
EXPECT_FALSE(prober.IsProbing());
prober.OnIncomingPacket(kProbeSize);
@ -37,39 +40,40 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id);
// First packet should probe as soon as possible.
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
EXPECT_EQ(Timestamp::MinusInfinity(), prober.NextProbeTime(now));
for (int i = 0; i < kClusterSize; ++i) {
now_ms += prober.TimeUntilNextProbe(now_ms);
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
now = std::max(now, prober.NextProbeTime(now));
EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now)));
EXPECT_EQ(0, prober.CurrentCluster().probe_cluster_id);
prober.ProbeSent(now_ms, kProbeSize);
prober.ProbeSent(now, kProbeSize);
}
EXPECT_GE(now_ms, kMinProbeDurationMs);
EXPECT_GE(now - start_time, kMinProbeDuration);
// Verify that the actual bitrate is withing 10% of the target.
double bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / now_ms;
DataRate bitrate =
DataSize::bytes(kProbeSize * (kClusterSize - 1)) / (now - start_time);
EXPECT_GT(bitrate, kTestBitrate1 * 0.9);
EXPECT_LT(bitrate, kTestBitrate1 * 1.1);
now_ms += prober.TimeUntilNextProbe(now_ms);
int64_t probe2_started = now_ms;
now = std::max(now, prober.NextProbeTime(now));
Timestamp probe2_started = now;
for (int i = 0; i < kClusterSize; ++i) {
now_ms += prober.TimeUntilNextProbe(now_ms);
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
now = std::max(now, prober.NextProbeTime(now));
EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now)));
EXPECT_EQ(1, prober.CurrentCluster().probe_cluster_id);
prober.ProbeSent(now_ms, kProbeSize);
prober.ProbeSent(now, kProbeSize);
}
// Verify that the actual bitrate is withing 10% of the target.
int duration = now_ms - probe2_started;
EXPECT_GE(duration, kMinProbeDurationMs);
bitrate = kProbeSize * (kClusterSize - 1) * 8 * 1000.0 / duration;
TimeDelta duration = now - probe2_started;
EXPECT_GE(duration, kMinProbeDuration);
bitrate = DataSize::bytes(kProbeSize * (kClusterSize - 1)) / duration;
EXPECT_GT(bitrate, kTestBitrate2 * 0.9);
EXPECT_LT(bitrate, kTestBitrate2 * 1.1);
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity());
EXPECT_FALSE(prober.IsProbing());
}
@ -77,23 +81,23 @@ TEST(BitrateProberTest, DoesntProbeWithoutRecentPackets) {
const FieldTrialBasedConfig config;
BitrateProber prober(config);
int64_t now_ms = 0;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
Timestamp now = Timestamp::Zero();
EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity());
prober.CreateProbeCluster(900000, now_ms, 0);
prober.CreateProbeCluster(DataRate::kbps(900), now, 0);
EXPECT_FALSE(prober.IsProbing());
prober.OnIncomingPacket(1000);
EXPECT_TRUE(prober.IsProbing());
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
prober.ProbeSent(now_ms, 1000);
EXPECT_EQ(now, std::max(now, prober.NextProbeTime(now)));
prober.ProbeSent(now, 1000);
// Let time pass, no large enough packets put into prober.
now_ms += 6000;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
now += TimeDelta::seconds(6);
EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity());
// Check that legacy behaviour where prober is reset in TimeUntilNextProbe is
// no longer there. Probes are no longer retried if they are timed out.
prober.OnIncomingPacket(1000);
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
EXPECT_EQ(prober.NextProbeTime(now), Timestamp::PlusInfinity());
}
TEST(BitrateProberTest, DoesntInitializeProbingForSmallPackets) {
@ -111,11 +115,12 @@ TEST(BitrateProberTest, VerifyProbeSizeOnHighBitrate) {
const FieldTrialBasedConfig config;
BitrateProber prober(config);
constexpr unsigned kHighBitrateBps = 10000000; // 10 Mbps
const DataRate kHighBitrate = DataRate::kbps(10000); // 10 Mbps
prober.CreateProbeCluster(kHighBitrateBps, 0, /*cluster_id=*/0);
prober.CreateProbeCluster(kHighBitrate, Timestamp::ms(0), /*cluster_id=*/0);
// Probe size should ensure a minimum of 1 ms interval.
EXPECT_GT(prober.RecommendedMinProbeSize(), kHighBitrateBps / 8000);
EXPECT_GT(prober.RecommendedMinProbeSize(),
(kHighBitrate * TimeDelta::ms(1)).bytes<size_t>());
}
TEST(BitrateProberTest, MinumumNumberOfProbingPackets) {
@ -123,14 +128,15 @@ TEST(BitrateProberTest, MinumumNumberOfProbingPackets) {
BitrateProber prober(config);
// Even when probing at a low bitrate we expect a minimum number
// of packets to be sent.
constexpr int kBitrateBps = 100000; // 100 kbps
constexpr int kPacketSizeBytes = 1000;
const DataRate kBitrate = DataRate::kbps(100);
const int kPacketSizeBytes = 1000;
prober.CreateProbeCluster(kBitrateBps, 0, 0);
Timestamp now = Timestamp::ms(0);
prober.CreateProbeCluster(kBitrate, now, 0);
prober.OnIncomingPacket(kPacketSizeBytes);
for (int i = 0; i < 5; ++i) {
EXPECT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kPacketSizeBytes);
prober.ProbeSent(now, kPacketSizeBytes);
}
EXPECT_FALSE(prober.IsProbing());
@ -139,16 +145,17 @@ TEST(BitrateProberTest, MinumumNumberOfProbingPackets) {
TEST(BitrateProberTest, ScaleBytesUsedForProbing) {
const FieldTrialBasedConfig config;
BitrateProber prober(config);
constexpr int kBitrateBps = 10000000; // 10 Mbps
constexpr int kPacketSizeBytes = 1000;
constexpr int kExpectedBytesSent = kBitrateBps * 15 / 8000;
const DataRate kBitrate = DataRate::kbps(10000); // 10 Mbps.
const int kPacketSizeBytes = 1000;
const int kExpectedBytesSent = (kBitrate * TimeDelta::ms(15)).bytes();
prober.CreateProbeCluster(kBitrateBps, 0, /*cluster_id=*/0);
Timestamp now = Timestamp::ms(0);
prober.CreateProbeCluster(kBitrate, now, /*cluster_id=*/0);
prober.OnIncomingPacket(kPacketSizeBytes);
int bytes_sent = 0;
while (bytes_sent < kExpectedBytesSent) {
ASSERT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kPacketSizeBytes);
prober.ProbeSent(now, kPacketSizeBytes);
bytes_sent += kPacketSizeBytes;
}
@ -158,16 +165,17 @@ TEST(BitrateProberTest, ScaleBytesUsedForProbing) {
TEST(BitrateProberTest, HighBitrateProbing) {
const FieldTrialBasedConfig config;
BitrateProber prober(config);
constexpr int kBitrateBps = 1000000000; // 1 Gbps.
constexpr int kPacketSizeBytes = 1000;
constexpr int kExpectedBytesSent = (kBitrateBps / 8000) * 15;
const DataRate kBitrate = DataRate::kbps(1000000); // 1 Gbps.
const int kPacketSizeBytes = 1000;
const int kExpectedBytesSent = (kBitrate * TimeDelta::ms(15)).bytes();
prober.CreateProbeCluster(kBitrateBps, 0, 0);
Timestamp now = Timestamp::ms(0);
prober.CreateProbeCluster(kBitrate, now, 0);
prober.OnIncomingPacket(kPacketSizeBytes);
int bytes_sent = 0;
while (bytes_sent < kExpectedBytesSent) {
ASSERT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kPacketSizeBytes);
prober.ProbeSent(now, kPacketSizeBytes);
bytes_sent += kPacketSizeBytes;
}
@ -177,28 +185,28 @@ TEST(BitrateProberTest, HighBitrateProbing) {
TEST(BitrateProberTest, ProbeClusterTimeout) {
const FieldTrialBasedConfig config;
BitrateProber prober(config);
constexpr int kBitrateBps = 300000; // 300 kbps
constexpr int kSmallPacketSize = 20;
const DataRate kBitrate = DataRate::kbps(300);
const int kSmallPacketSize = 20;
// Expecting two probe clusters of 5 packets each.
constexpr int kExpectedBytesSent = 20 * 2 * 5;
constexpr int64_t kTimeoutMs = 5000;
const int kExpectedBytesSent = 20 * 2 * 5;
const TimeDelta kTimeout = TimeDelta::ms(5000);
int64_t now_ms = 0;
prober.CreateProbeCluster(kBitrateBps, now_ms, /*cluster_id=*/0);
Timestamp now = Timestamp::ms(0);
prober.CreateProbeCluster(kBitrate, now, /*cluster_id=*/0);
prober.OnIncomingPacket(kSmallPacketSize);
EXPECT_FALSE(prober.IsProbing());
now_ms += kTimeoutMs;
prober.CreateProbeCluster(kBitrateBps / 10, now_ms, /*cluster_id=*/1);
now += kTimeout;
prober.CreateProbeCluster(kBitrate / 10, now, /*cluster_id=*/1);
prober.OnIncomingPacket(kSmallPacketSize);
EXPECT_FALSE(prober.IsProbing());
now_ms += 1;
prober.CreateProbeCluster(kBitrateBps / 10, now_ms, /*cluster_id=*/2);
now += TimeDelta::ms(1);
prober.CreateProbeCluster(kBitrate / 10, now, /*cluster_id=*/2);
prober.OnIncomingPacket(kSmallPacketSize);
EXPECT_TRUE(prober.IsProbing());
int bytes_sent = 0;
while (bytes_sent < kExpectedBytesSent) {
ASSERT_TRUE(prober.IsProbing());
prober.ProbeSent(0, kSmallPacketSize);
prober.ProbeSent(now, kSmallPacketSize);
bytes_sent += kSmallPacketSize;
}

View File

@ -36,6 +36,7 @@ PacedSender::PacedSender(Clock* clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials),
clock_(clock),
packet_router_(packet_router),
process_thread_(process_thread) {
if (process_thread_)
@ -136,9 +137,9 @@ int64_t PacedSender::TimeUntilNextProcess() {
.ms();
}
auto next_probe = pacing_controller_.TimeUntilNextProbe();
if (next_probe) {
return next_probe->ms();
Timestamp next_probe = pacing_controller_.NextProbeTime();
if (next_probe != Timestamp::PlusInfinity()) {
return std::max(TimeDelta::Zero(), next_probe - clock_->CurrentTime()).ms();
}
const TimeDelta min_packet_limit = TimeDelta::ms(5);

View File

@ -165,6 +165,7 @@ class PacedSender : public Module,
rtc::CriticalSection critsect_;
PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
Clock* const clock_;
PacketRouter* const packet_router_;
ProcessThread* const process_thread_;
};

View File

@ -123,7 +123,7 @@ PacingController::PacingController(Clock* clock,
PacingController::~PacingController() = default;
void PacingController::CreateProbeCluster(DataRate bitrate, int cluster_id) {
prober_.CreateProbeCluster(bitrate.bps(), CurrentTime().ms(), cluster_id);
prober_.CreateProbeCluster(bitrate, CurrentTime(), cluster_id);
}
void PacingController::Pause() {
@ -233,10 +233,10 @@ TimeDelta PacingController::OldestPacketWaitTime() const {
void PacingController::EnqueuePacketInternal(
std::unique_ptr<RtpPacketToSend> packet,
int priority) {
Timestamp now = CurrentTime();
prober_.OnIncomingPacket(packet->payload_size());
// TODO(sprang): Make sure tests respect this, replace with DCHECK.
Timestamp now = CurrentTime();
if (packet->capture_time_ms() < 0) {
packet->set_capture_time_ms(now.ms());
}
@ -272,19 +272,26 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
return false;
}
absl::optional<TimeDelta> PacingController::TimeUntilNextProbe() {
Timestamp PacingController::NextProbeTime() {
if (!prober_.IsProbing()) {
return absl::nullopt;
return Timestamp::PlusInfinity();
}
TimeDelta time_delta =
TimeDelta::ms(prober_.TimeUntilNextProbe(CurrentTime().ms()));
if (time_delta > TimeDelta::Zero() ||
(time_delta == TimeDelta::Zero() && !probing_send_failure_)) {
return time_delta;
Timestamp now = CurrentTime();
Timestamp probe_time = prober_.NextProbeTime(now);
if (probe_time.IsInfinite()) {
return probe_time;
}
return absl::nullopt;
if (probe_time > now) {
return probe_time;
}
if (probing_send_failure_ || now - probe_time > TimeDelta::ms(1)) {
return Timestamp::PlusInfinity();
}
return probe_time;
}
TimeDelta PacingController::TimeElapsedSinceLastProcess() const {
@ -400,7 +407,7 @@ void PacingController::ProcessPackets() {
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
prober_.ProbeSent(CurrentTime().ms(), data_sent.bytes());
prober_.ProbeSent(CurrentTime(), data_sent.bytes());
}
}
}

View File

@ -118,10 +118,11 @@ class PacingController {
// effect.
void SetProbingEnabled(bool enabled);
// Time until next probe should be sent. If this value is set, it should be
// Time at which next probe should be sent. If this value is set, it should be
// respected - i.e. don't call ProcessPackets() before this specified time as
// that can have unintended side effects.
absl::optional<TimeDelta> TimeUntilNextProbe();
// If no scheduled probe, Timestamp::PlusInifinity() is returned.
Timestamp NextProbeTime();
// Time since ProcessPackets() was last executed.
TimeDelta TimeElapsedSinceLastProcess() const;

View File

@ -152,11 +152,19 @@ class PacingControllerProbing : public PacingController::PacketSender {
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize target_size) override {
// From RTPSender:
// Max in the RFC 3550 is 255 bytes, we limit it to be modulus 32 for SRTP.
const DataSize kMaxPadding = DataSize::bytes(224);
std::vector<std::unique_ptr<RtpPacketToSend>> packets;
packets.emplace_back(std::make_unique<RtpPacketToSend>(nullptr));
packets.back()->SetPadding(target_size.bytes());
packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding);
padding_sent_ += target_size.bytes();
while (target_size > DataSize::Zero()) {
DataSize padding_size = std::min(kMaxPadding, target_size);
packets.emplace_back(std::make_unique<RtpPacketToSend>(nullptr));
packets.back()->SetPadding(padding_size.bytes());
packets.back()->set_packet_type(RtpPacketToSend::Type::kPadding);
padding_sent_ += padding_size.bytes();
target_size -= padding_size;
}
return packets;
}
@ -246,9 +254,9 @@ class PacingControllerTest : public ::testing::Test {
TimeDelta::Zero());
}
auto next_probe = pacer_->TimeUntilNextProbe();
if (next_probe) {
return *next_probe;
Timestamp next_probe = pacer_->NextProbeTime();
if (next_probe != Timestamp::PlusInfinity()) {
return std::max(TimeDelta::Zero(), next_probe - clock_.CurrentTime());
}
const TimeDelta min_packet_limit = TimeDelta::ms(5);
@ -1105,6 +1113,65 @@ TEST_F(PacingControllerTest, ProbingWithInsertedPackets) {
kSecondClusterRate.bps(), kProbingErrorMargin.bps());
}
TEST_F(PacingControllerTest, SkipsProbesWhenProcessIntervalTooLarge) {
const size_t kPacketSize = 1200;
const int kInitialBitrateBps = 300000;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
PacingControllerProbing packet_sender;
pacer_ = std::make_unique<PacingController>(&clock_, &packet_sender, nullptr,
nullptr);
pacer_->SetPacingRates(DataRate::bps(kInitialBitrateBps * kPaceMultiplier),
DataRate::Zero());
for (int i = 0; i < 10; ++i) {
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
}
while (pacer_->QueueSizePackets() > 0) {
clock_.AdvanceTime(TimeUntilNextProcess());
pacer_->ProcessPackets();
}
// Probe at a very high rate.
pacer_->CreateProbeCluster(DataRate::kbps(10000), // 10 Mbps.
/*cluster_id=*/3);
// We need one packet to start the probe.
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
const int packets_sent_before_probe = packet_sender.packets_sent();
clock_.AdvanceTime(TimeUntilNextProcess());
pacer_->ProcessPackets();
EXPECT_EQ(packet_sender.packets_sent(), packets_sent_before_probe + 1);
// Figure out how long between probe packets.
Timestamp start_time = clock_.CurrentTime();
clock_.AdvanceTime(TimeUntilNextProcess());
TimeDelta time_between_probes = clock_.CurrentTime() - start_time;
// Advance that distance again + 1ms.
clock_.AdvanceTime(time_between_probes);
// Send second probe packet.
Send(RtpPacketToSend::Type::kVideo, ssrc, sequence_number++,
clock_.TimeInMilliseconds(), kPacketSize);
pacer_->ProcessPackets();
EXPECT_EQ(packet_sender.packets_sent(), packets_sent_before_probe + 2);
// We're exactly where we should be for the next probe.
EXPECT_TRUE(pacer_->NextProbeTime().IsFinite());
// Advance to within one millisecond past where the next probe should be sent,
// will still indicate "process immediately".
clock_.AdvanceTime(TimeDelta::us(500));
EXPECT_TRUE(pacer_->NextProbeTime().IsFinite());
// We've gone more than one millisecond past the time for the next probe
// packet, it will dropped.
clock_.AdvanceTime(TimeDelta::ms(1));
EXPECT_EQ(pacer_->NextProbeTime(), Timestamp::PlusInfinity());
}
TEST_F(PacingControllerTest, ProbingWithPaddingSupport) {
const size_t kPacketSize = 1200;
const int kInitialBitrateBps = 300000;