BitrateProber: Support higher probing bitrates

Currently, BitrateProber does not scale higher than 2 Mbps to 6 Mbps. The actual
number is dependent on the size of the last packet. If a packet of around 250
bytes is used for probing, it fails above 2 Mbps.

BitrateProber now provides a recommendation on probe size instead of a
packet size. PacedSender utilizes this to decide on the number of packets
per probe. This enables BitrateProber to scale up-to higher bitrates.

Tests with chromoting show it stalls at about 10 Mbps (perhaps due to the
limitation on the simulation pipeline to deliver packets).

BUG=webrtc:6332

Review-Url: https://codereview.webrtc.org/2347023002
Cr-Commit-Position: refs/heads/master@{#14503}
This commit is contained in:
isheriff
2016-10-04 08:29:38 -07:00
committed by Commit bot
parent 409f573f8e
commit cc5903e15f
7 changed files with 238 additions and 132 deletions

View File

@ -23,17 +23,20 @@ namespace {
// Inactivity threshold above which probing is restarted.
constexpr int kInactivityThresholdMs = 5000;
int ComputeDeltaFromBitrate(size_t packet_size, uint32_t bitrate_bps) {
// A minimum interval between probes to allow scheduling to be feasible.
constexpr int kMinProbeDeltaMs = 1;
int ComputeDeltaFromBitrate(size_t probe_size, uint32_t bitrate_bps) {
RTC_CHECK_GT(bitrate_bps, 0u);
// Compute the time delta needed to send packet_size bytes at bitrate_bps
// Compute the time delta needed to send probe_size bytes at bitrate_bps
// bps. Result is in milliseconds.
return static_cast<int>((1000ll * packet_size * 8) / bitrate_bps);
return static_cast<int>((1000ll * probe_size * 8) / bitrate_bps);
}
} // namespace
BitrateProber::BitrateProber()
: probing_state_(ProbingState::kDisabled),
packet_size_last_sent_(0),
probe_size_last_sent_(0),
time_last_probe_sent_ms_(-1),
next_cluster_id_(0) {
SetEnabled(true);
@ -65,15 +68,15 @@ void BitrateProber::OnIncomingPacket(size_t packet_size) {
}
}
void BitrateProber::CreateProbeCluster(int bitrate_bps, int num_packets) {
void BitrateProber::CreateProbeCluster(int bitrate_bps, int num_probes) {
RTC_DCHECK(probing_state_ != ProbingState::kDisabled);
ProbeCluster cluster;
cluster.max_probe_packets = num_packets;
cluster.max_probes = num_probes;
cluster.probe_bitrate_bps = bitrate_bps;
cluster.id = next_cluster_id_++;
clusters_.push(cluster);
LOG(LS_INFO) << "Probe cluster (bitrate:packets): ("
<< cluster.probe_bitrate_bps << ":" << cluster.max_probe_packets
LOG(LS_INFO) << "Probe cluster (bitrate:probes): ("
<< cluster.probe_bitrate_bps << ":" << cluster.max_probes
<< ") ";
if (probing_state_ != ProbingState::kActive)
probing_state_ = ProbingState::kInactive;
@ -81,14 +84,14 @@ void BitrateProber::CreateProbeCluster(int bitrate_bps, int num_packets) {
void BitrateProber::ResetState() {
time_last_probe_sent_ms_ = -1;
packet_size_last_sent_ = 0;
probe_size_last_sent_ = 0;
// Recreate all probing clusters.
std::queue<ProbeCluster> clusters;
clusters.swap(clusters_);
while (!clusters.empty()) {
CreateProbeCluster(clusters.front().probe_bitrate_bps,
clusters.front().max_probe_packets);
clusters.front().max_probes);
clusters.pop();
}
// If its enabled, reset to inactive.
@ -116,13 +119,10 @@ int BitrateProber::TimeUntilNextProbe(int64_t now_ms) {
// We will send the first probe packet immediately if no packet has been
// sent before.
int time_until_probe_ms = 0;
if (packet_size_last_sent_ != 0 && probing_state_ == ProbingState::kActive) {
if (probe_size_last_sent_ != 0 && probing_state_ == ProbingState::kActive) {
int next_delta_ms = ComputeDeltaFromBitrate(
packet_size_last_sent_, clusters_.front().probe_bitrate_bps);
probe_size_last_sent_, clusters_.front().probe_bitrate_bps);
time_until_probe_ms = next_delta_ms - elapsed_time_ms;
// There is no point in trying to probe with less than 1 ms between packets
// as it essentially means trying to probe at infinite bandwidth.
const int kMinProbeDeltaMs = 1;
// If we have waited more than 3 ms for a new packet to probe with we will
// consider this probing session over.
const int kMaxProbeDelayMs = 3;
@ -142,22 +142,24 @@ int BitrateProber::CurrentClusterId() const {
return clusters_.front().id;
}
size_t BitrateProber::RecommendedPacketSize() const {
return packet_size_last_sent_;
// Probe size is recommended based on the probe bitrate required. We choose
// a minimum of twice |kMinProbeDeltaMs| interval to allow scheduling to be
// feasible.
size_t BitrateProber::RecommendedMinProbeSize() const {
RTC_DCHECK(!clusters_.empty());
return clusters_.front().probe_bitrate_bps * 2 * kMinProbeDeltaMs /
(8 * 1000);
}
void BitrateProber::PacketSent(int64_t now_ms, size_t packet_size) {
assert(packet_size > 0);
if (packet_size < PacedSender::kMinProbePacketSize)
return;
packet_size_last_sent_ = packet_size;
if (probing_state_ != ProbingState::kActive)
return;
void BitrateProber::ProbeSent(int64_t now_ms, size_t bytes) {
RTC_DCHECK(probing_state_ == ProbingState::kActive);
RTC_DCHECK_GT(bytes, 0u);
probe_size_last_sent_ = bytes;
time_last_probe_sent_ms_ = now_ms;
if (!clusters_.empty()) {
ProbeCluster* cluster = &clusters_.front();
++cluster->sent_probe_packets;
if (cluster->sent_probe_packets == cluster->max_probe_packets)
++cluster->sent_probes;
if (cluster->sent_probes == cluster->max_probes)
clusters_.pop();
if (clusters_.empty())
probing_state_ = ProbingState::kSuspended;

View File

@ -36,24 +36,26 @@ class BitrateProber {
// with.
void OnIncomingPacket(size_t packet_size);
// Create a cluster used to probe for |bitrate_bps| with |num_packets| number
// of packets.
void CreateProbeCluster(int bitrate_bps, int num_packets);
// Create a cluster used to probe for |bitrate_bps| with |num_probes| number
// of probes.
void CreateProbeCluster(int bitrate_bps, int num_probes);
// Returns the number of milliseconds until the next packet should be sent to
// Returns the number of milliseconds until the next probe should be sent to
// get accurate probing.
int TimeUntilNextProbe(int64_t now_ms);
// Which cluster that is currently being used for probing.
int CurrentClusterId() const;
// Returns the number of bytes that the prober recommends for the next probe
// packet.
size_t RecommendedPacketSize() const;
// Returns the minimum number of bytes that the prober recommends for
// the next probe.
size_t RecommendedMinProbeSize() const;
// Called to report to the prober that a packet has been sent, which helps the
// prober know when to move to the next packet in a probe.
void PacketSent(int64_t now_ms, size_t packet_size);
// Called to report to the prober that a probe has been sent. In case of
// multiple packets per probe, this call would be made at the end of sending
// the last packet in probe. |probe_size| is the total size of all packets
// in probe.
void ProbeSent(int64_t now_ms, size_t probe_size);
private:
enum class ProbingState {
@ -69,9 +71,11 @@ class BitrateProber {
kSuspended,
};
// A probe cluster consists of a set of probes. Each probe in turn can be
// divided into a number of packets to accomodate the MTU on the network.
struct ProbeCluster {
int max_probe_packets = 0;
int sent_probe_packets = 0;
int max_probes = 0;
int sent_probes = 0;
int probe_bitrate_bps = 0;
int id = -1;
};
@ -84,7 +88,8 @@ class BitrateProber {
// the previous probe packet based on the size and time when that packet was
// sent.
std::queue<ProbeCluster> clusters_;
size_t packet_size_last_sent_;
// A probe can include one or more packets.
size_t probe_size_last_sent_;
// The last time a probe was sent.
int64_t time_last_probe_sent_ms_;
int next_cluster_id_;

View File

@ -31,7 +31,7 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
// First packet should probe as soon as possible.
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
prober.PacketSent(now_ms, 1000);
prober.ProbeSent(now_ms, 1000);
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(8, prober.TimeUntilNextProbe(now_ms));
@ -40,14 +40,14 @@ TEST(BitrateProberTest, VerifyStatesAndTimeBetweenProbes) {
now_ms += 4;
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
EXPECT_EQ(0, prober.CurrentClusterId());
prober.PacketSent(now_ms, 1000);
prober.ProbeSent(now_ms, 1000);
}
for (int i = 0; i < 5; ++i) {
EXPECT_EQ(4, prober.TimeUntilNextProbe(now_ms));
now_ms += 4;
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
EXPECT_EQ(1, prober.CurrentClusterId());
prober.PacketSent(now_ms, 1000);
prober.ProbeSent(now_ms, 1000);
}
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
@ -66,19 +66,19 @@ TEST(BitrateProberTest, DoesntProbeWithoutRecentPackets) {
prober.OnIncomingPacket(1000);
EXPECT_TRUE(prober.IsProbing());
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
prober.PacketSent(now_ms, 1000);
prober.ProbeSent(now_ms, 1000);
// Let time pass, no large enough packets put into prober.
now_ms += 6000;
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
// Insert a small packet, not a candidate for probing.
prober.OnIncomingPacket(100);
prober.PacketSent(now_ms, 100);
EXPECT_FALSE(prober.IsProbing());
EXPECT_EQ(-1, prober.TimeUntilNextProbe(now_ms));
// Insert a large-enough packet after downtime while probing should reset to
// perform a new probe since the requested one didn't finish.
prober.OnIncomingPacket(1000);
EXPECT_EQ(0, prober.TimeUntilNextProbe(now_ms));
prober.PacketSent(now_ms, 1000);
prober.ProbeSent(now_ms, 1000);
// Next packet should be part of new probe and be sent with non-zero delay.
prober.OnIncomingPacket(1000);
EXPECT_GT(prober.TimeUntilNextProbe(now_ms), 0);
@ -93,4 +93,13 @@ TEST(BitrateProberTest, DoesntInitializeProbingForSmallPackets) {
EXPECT_FALSE(prober.IsProbing());
}
TEST(BitrateProberTest, VerifyProbeSizeOnHighBitrate) {
BitrateProber prober;
constexpr unsigned kHighBitrateBps = 10000000; // 10 Mbps
prober.CreateProbeCluster(kHighBitrateBps, 6);
// Probe size should ensure a minimum of 1 ms interval.
EXPECT_GT(prober.RecommendedMinProbeSize(), kHighBitrateBps / 8000);
}
} // namespace webrtc

View File

@ -402,8 +402,13 @@ void PacedSender::Process() {
}
bool is_probing = prober_->IsProbing();
int probe_cluster_id = is_probing ? prober_->CurrentClusterId()
: PacketInfo::kNotAProbe;
int probe_cluster_id = PacketInfo::kNotAProbe;
size_t bytes_sent = 0;
size_t recommended_probe_size = 0;
if (is_probing) {
probe_cluster_id = prober_->CurrentClusterId();
recommended_probe_size = prober_->RecommendedMinProbeSize();
}
while (!packets_->Empty()) {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
@ -412,30 +417,32 @@ void PacedSender::Process() {
if (SendPacket(packet, probe_cluster_id)) {
// Send succeeded, remove it from the queue.
bytes_sent += packet.bytes;
packets_->FinalizePop(packet);
if (is_probing)
return;
if (is_probing && bytes_sent > recommended_probe_size)
break;
} else {
// Send failed, put it back into the queue.
packets_->CancelPop(packet);
return;
break;
}
}
RTC_DCHECK(packets_->Empty());
// TODO(holmer): Remove the paused_ check when issue 5307 has been fixed.
if (paused_)
return;
if (packets_->Empty() && !paused_) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
int padding_needed =
static_cast<int>(is_probing ? (recommended_probe_size - bytes_sent)
: padding_budget_->bytes_remaining());
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
size_t padding_needed = is_probing ? prober_->RecommendedPacketSize()
: padding_budget_->bytes_remaining();
if (padding_needed > 0)
SendPadding(padding_needed, probe_cluster_id);
if (padding_needed > 0)
bytes_sent += SendPadding(padding_needed, probe_cluster_id);
}
}
if (is_probing && bytes_sent > 0)
prober_->ProbeSent(clock_->TimeInMilliseconds(), bytes_sent);
}
bool PacedSender::SendPacket(const paced_sender::Packet& packet,
@ -458,7 +465,6 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet,
critsect_->Enter();
if (success) {
prober_->PacketSent(clock_->TimeInMilliseconds(), packet.bytes);
// TODO(holmer): High priority packets should only be accounted for if we
// are allocating bandwidth for audio.
if (packet.priority != kHighPriority) {
@ -471,17 +477,17 @@ bool PacedSender::SendPacket(const paced_sender::Packet& packet,
return success;
}
void PacedSender::SendPadding(size_t padding_needed, int probe_cluster_id) {
size_t PacedSender::SendPadding(size_t padding_needed, int probe_cluster_id) {
critsect_->Leave();
size_t bytes_sent =
packet_sender_->TimeToSendPadding(padding_needed, probe_cluster_id);
critsect_->Enter();
if (bytes_sent > 0) {
prober_->PacketSent(clock_->TimeInMilliseconds(), bytes_sent);
media_budget_->UseBudget(bytes_sent);
padding_budget_->UseBudget(bytes_sent);
}
return bytes_sent;
}
void PacedSender::UpdateBytesPerInterval(int64_t delta_time_ms) {

View File

@ -137,7 +137,7 @@ class PacedSender : public Module, public RtpPacketSender {
bool SendPacket(const paced_sender::Packet& packet, int probe_cluster_id)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void SendPadding(size_t padding_needed, int probe_cluster_id)
size_t SendPadding(size_t padding_needed, int probe_cluster_id)
EXCLUSIVE_LOCKS_REQUIRED(critsect_);
Clock* const clock_;

View File

@ -19,6 +19,18 @@
using testing::_;
using testing::Return;
namespace {
constexpr unsigned kFirstClusterBps = 900000;
constexpr int kFirstClusterCount = 6;
constexpr unsigned kSecondClusterBps = 1800000;
constexpr int kSecondClusterCount = 5;
// The error stems from truncating the time interval of probe packets to integer
// values. This results in probing slightly higher than the target bitrate.
// For 1.8 Mbps, this comes to be about 120 kbps with 1200 probe packets.
constexpr int kBitrateProbingError = 150000;
} // namespace
namespace webrtc {
namespace test {
@ -62,46 +74,29 @@ class PacedSenderPadding : public PacedSender::PacketSender {
class PacedSenderProbing : public PacedSender::PacketSender {
public:
PacedSenderProbing(const std::list<int>& expected_deltas, Clock* clock)
: prev_packet_time_ms_(-1),
expected_deltas_(expected_deltas),
packets_sent_(0),
clock_(clock) {}
PacedSenderProbing() : packets_sent_(0), padding_sent_(0) {}
bool TimeToSendPacket(uint32_t ssrc,
uint16_t sequence_number,
int64_t capture_time_ms,
bool retransmission,
int probe_cluster_id) override {
ExpectAndCountPacket();
packets_sent_++;
return true;
}
size_t TimeToSendPadding(size_t bytes, int probe_cluster_id) override {
ExpectAndCountPacket();
return bytes;
}
void ExpectAndCountPacket() {
++packets_sent_;
EXPECT_FALSE(expected_deltas_.empty());
if (expected_deltas_.empty())
return;
int64_t now_ms = clock_->TimeInMilliseconds();
if (prev_packet_time_ms_ >= 0) {
EXPECT_EQ(expected_deltas_.front(), now_ms - prev_packet_time_ms_);
expected_deltas_.pop_front();
}
prev_packet_time_ms_ = now_ms;
padding_sent_ += bytes;
return padding_sent_;
}
int packets_sent() const { return packets_sent_; }
int padding_sent() const { return padding_sent_; }
private:
int64_t prev_packet_time_ms_;
std::list<int> expected_deltas_;
int packets_sent_;
Clock* clock_;
int padding_sent_;
};
class PacedSenderTest : public ::testing::Test {
@ -110,8 +105,8 @@ class PacedSenderTest : public ::testing::Test {
srand(0);
// Need to initialize PacedSender after we initialize clock.
send_bucket_.reset(new PacedSender(&clock_, &callback_));
send_bucket_->CreateProbeCluster(900000, 6);
send_bucket_->CreateProbeCluster(1800000, 5);
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->CreateProbeCluster(kSecondClusterBps, kSecondClusterCount);
// Default to bitrate probing disabled for testing purposes. Probing tests
// have to enable probing, either by creating a new PacedSender instance or
// by calling SetProbingEnabled(true).
@ -803,74 +798,86 @@ TEST_F(PacedSenderTest, QueueTimeGrowsOverTime) {
EXPECT_EQ(0, send_bucket_->QueueInMs());
}
TEST_F(PacedSenderTest, ProbingWithInitialFrame) {
const int kNumPackets = 11;
const int kNumDeltas = kNumPackets - 1;
TEST_F(PacedSenderTest, ProbingWithInsertedPackets) {
const size_t kPacketSize = 1200;
const int kInitialBitrateBps = 300000;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
const int expected_deltas[kNumDeltas] = {10, 10, 10, 10, 10, 5, 5, 5, 5, 5};
std::list<int> expected_deltas_list(expected_deltas,
expected_deltas + kNumDeltas);
PacedSenderProbing callback(expected_deltas_list, &clock_);
send_bucket_.reset(new PacedSender(&clock_, &callback));
send_bucket_->CreateProbeCluster(900000, 6);
send_bucket_->CreateProbeCluster(1800000, 5);
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender));
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->CreateProbeCluster(kSecondClusterBps, kSecondClusterCount);
send_bucket_->SetEstimatedBitrate(kInitialBitrateBps);
for (int i = 0; i < kNumPackets; ++i) {
for (int i = 0; i < kFirstClusterCount + kSecondClusterCount; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
}
while (callback.packets_sent() < kNumPackets) {
int64_t start = clock_.TimeInMilliseconds();
while (packet_sender.packets_sent() < kFirstClusterCount) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
if (time_until_process <= 0) {
send_bucket_->Process();
} else {
clock_.AdvanceTimeMilliseconds(time_until_process);
}
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
}
int packets_sent = packet_sender.packets_sent();
// Validate first cluster bitrate. Note that we have to account for number
// of intervals and hence (packets_sent - 1) on the first cluster.
EXPECT_NEAR((packets_sent - 1) * kPacketSize * 8000 /
(clock_.TimeInMilliseconds() - start),
kFirstClusterBps, kBitrateProbingError);
EXPECT_EQ(0, packet_sender.padding_sent());
start = clock_.TimeInMilliseconds();
while (packet_sender.packets_sent() <
kFirstClusterCount + kSecondClusterCount) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
}
packets_sent = packet_sender.packets_sent() - packets_sent;
// Validate second cluster bitrate.
EXPECT_NEAR(
packets_sent * kPacketSize * 8000 / (clock_.TimeInMilliseconds() - start),
kSecondClusterBps, kBitrateProbingError);
}
TEST_F(PacedSenderTest, ProbingWithTooSmallInitialFrame) {
const int kNumPackets = 11;
const int kNumDeltas = kNumPackets - 1;
TEST_F(PacedSenderTest, ProbingWithPaddingSupport) {
const size_t kPacketSize = 1200;
const int kInitialBitrateBps = 300000;
uint32_t ssrc = 12346;
uint16_t sequence_number = 1234;
const int expected_deltas[kNumDeltas] = {10, 10, 10, 10, 10, 5, 5, 5, 5, 5};
std::list<int> expected_deltas_list(expected_deltas,
expected_deltas + kNumDeltas);
PacedSenderProbing callback(expected_deltas_list, &clock_);
send_bucket_.reset(new PacedSender(&clock_, &callback));
send_bucket_->CreateProbeCluster(900000, 6);
send_bucket_->CreateProbeCluster(1800000, 5);
PacedSenderProbing packet_sender;
send_bucket_.reset(new PacedSender(&clock_, &packet_sender));
send_bucket_->CreateProbeCluster(kFirstClusterBps, kFirstClusterCount);
send_bucket_->SetEstimatedBitrate(kInitialBitrateBps);
for (int i = 0; i < kNumPackets - 5; ++i) {
for (int i = 0; i < kFirstClusterCount - 2; ++i) {
send_bucket_->InsertPacket(PacedSender::kNormalPriority, ssrc,
sequence_number++, clock_.TimeInMilliseconds(),
kPacketSize, false);
}
while (callback.packets_sent() < kNumPackets) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
if (time_until_process <= 0) {
send_bucket_->Process();
} else {
clock_.AdvanceTimeMilliseconds(time_until_process);
}
}
// Process one more time and make sure we don't send any more probes.
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
EXPECT_EQ(kNumPackets, callback.packets_sent());
int64_t start = clock_.TimeInMilliseconds();
int process_count = 0;
while (process_count < kFirstClusterCount) {
int time_until_process = send_bucket_->TimeUntilNextProcess();
clock_.AdvanceTimeMilliseconds(time_until_process);
send_bucket_->Process();
++process_count;
}
int packets_sent = packet_sender.packets_sent();
int padding_sent = packet_sender.padding_sent();
EXPECT_GT(packets_sent, 0);
EXPECT_GT(padding_sent, 0);
// Note that the number of intervals here for kPacketSize is
// packets_sent due to padding in the same cluster.
EXPECT_NEAR((packets_sent * kPacketSize * 8000 + padding_sent) /
(clock_.TimeInMilliseconds() - start),
kFirstClusterBps, kBitrateProbingError);
}
TEST_F(PacedSenderTest, PriorityInversion) {