Optionally allows TaskQueuePacedSender to coalesce send events.

With an optional parameter this allows the task-queue based paced
sender to mimic the old behavior and coalesce sending of packets in
order to reduce thread wakeups and provide opportunity for batching.
This is done by simply overriding the minimum time the thread should
sleep. The pacing controller will already handle the "late wakup" case
and send any packets as if it had been woken at the optimal time.

Bug: webrtc:10809
Change-Id: Iceea00693a4e87d39b0e0ee8bdabca081dff2cba
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/175648
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31328}
This commit is contained in:
Erik Språng
2020-05-19 17:40:58 +02:00
committed by Commit Bot
parent 2671dac29c
commit 4ab61cb9b4
6 changed files with 209 additions and 114 deletions

View File

@ -91,13 +91,16 @@ RtpTransportControllerSend::RtpTransportControllerSend(
event_log,
trials,
process_thread_.get())),
task_queue_pacer_(use_task_queue_pacer_
? new TaskQueuePacedSender(clock,
&packet_router_,
event_log,
trials,
task_queue_factory)
: nullptr),
task_queue_pacer_(
use_task_queue_pacer_
? new TaskQueuePacedSender(
clock,
&packet_router_,
event_log,
trials,
task_queue_factory,
/*hold_back_window = */ PacingController::kMinSleepTime)
: nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(

View File

@ -193,6 +193,10 @@ bool PacingController::Congested() const {
return false;
}
bool PacingController::IsProbing() const {
return prober_.is_probing();
}
Timestamp PacingController::CurrentTime() const {
Timestamp time = clock_->CurrentTime();
if (time < last_timestamp_) {

View File

@ -146,6 +146,8 @@ class PacingController {
bool Congested() const;
bool IsProbing() const;
private:
void EnqueuePacketInternal(std::unique_ptr<RtpPacketToSend> packet,
int priority);

View File

@ -34,8 +34,10 @@ TaskQueuePacedSender::TaskQueuePacedSender(
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory)
TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window)
: clock_(clock),
hold_back_window_(hold_back_window),
packet_router_(packet_router),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
@ -200,8 +202,10 @@ void TaskQueuePacedSender::MaybeProcessPackets(
next_process_time = pacing_controller_.NextSendTime();
}
next_process_time =
std::max(now + PacingController::kMinSleepTime, next_process_time);
const TimeDelta min_sleep = pacing_controller_.IsProbing()
? PacingController::kMinSleepTime
: hold_back_window_;
next_process_time = std::max(now + min_sleep, next_process_time);
TimeDelta sleep_time = next_process_time - now;
if (next_process_time_.IsMinusInfinity() ||

View File

@ -42,11 +42,18 @@ class TaskQueuePacedSender : public RtpPacketPacer,
public RtpPacketSender,
private PacingController::PacketSender {
public:
TaskQueuePacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory);
// The |hold_back_window| parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
// latency.
// TODO(bugs.webrtc.org/10809): Remove default value for hold_back_window.
TaskQueuePacedSender(
Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
TimeDelta hold_back_window = PacingController::kMinSleepTime);
~TaskQueuePacedSender() override;
@ -131,6 +138,7 @@ class TaskQueuePacedSender : public RtpPacketPacer,
Stats GetStats() const;
Clock* const clock_;
const TimeDelta hold_back_window_;
PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);

View File

@ -24,6 +24,7 @@
#include "test/time_controller/simulated_time_controller.h"
using ::testing::_;
using ::testing::AtLeast;
using ::testing::Return;
using ::testing::SaveArg;
@ -48,17 +49,6 @@ class MockPacketRouter : public PacketRouter {
namespace test {
class TaskQueuePacedSenderTest : public ::testing::Test {
public:
TaskQueuePacedSenderTest()
: time_controller_(Timestamp::Millis(1234)),
pacer_(time_controller_.GetClock(),
&packet_router_,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller_.GetTaskQueueFactory()) {}
protected:
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketMediaType type) {
auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
@ -92,109 +82,193 @@ class TaskQueuePacedSenderTest : public ::testing::Test {
return packets;
}
Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); }
TEST(TaskQueuePacedSenderTest, PacesPackets) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime);
GlobalSimulatedTimeController time_controller_;
MockPacketRouter packet_router_;
TaskQueuePacedSender pacer_;
};
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
pacer.SetPacingRates(
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero());
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
TEST_F(TaskQueuePacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
pacer_.SetPacingRates(
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero());
pacer_.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsToSend));
// Expect all of them to be sent.
size_t packets_sent = 0;
Timestamp end_time = Timestamp::PlusInfinity();
EXPECT_CALL(packet_router, SendPacket)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
++packets_sent;
if (packets_sent == kPacketsToSend) {
end_time = time_controller.GetClock()->CurrentTime();
}
});
// Expect all of them to be sent.
size_t packets_sent = 0;
Timestamp end_time = Timestamp::PlusInfinity();
EXPECT_CALL(packet_router_, SendPacket)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
++packets_sent;
if (packets_sent == kPacketsToSend) {
end_time = time_controller_.GetClock()->CurrentTime();
}
});
const Timestamp start_time = time_controller.GetClock()->CurrentTime();
const Timestamp start_time = time_controller_.GetClock()->CurrentTime();
// Packets should be sent over a period of close to 1s. Expect a little
// lower than this since initial probing is a bit quicker.
time_controller.AdvanceTime(TimeDelta::Seconds(1));
EXPECT_EQ(packets_sent, kPacketsToSend);
ASSERT_TRUE(end_time.IsFinite());
EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
}
// Packets should be sent over a period of close to 1s. Expect a little lower
// than this since initial probing is a bit quicker.
time_controller_.AdvanceTime(TimeDelta::Seconds(1));
EXPECT_EQ(packets_sent, kPacketsToSend);
ASSERT_TRUE(end_time.IsFinite());
EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
}
TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime);
TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
const DataRate kPacingRate =
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
pacer_.SetPacingRates(kPacingRate, DataRate::Zero());
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
const DataRate kPacingRate =
DataRate::BitsPerSec(kDefaultPacketSize * 8 * kPacketsPerSecond);
pacer.SetPacingRates(kPacingRate, DataRate::Zero());
// Send some initial packets to be rid of any probes.
EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond);
pacer_.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
time_controller_.AdvanceTime(TimeDelta::Seconds(1));
// Send some initial packets to be rid of any probes.
EXPECT_CALL(packet_router, SendPacket).Times(kPacketsPerSecond);
pacer.EnqueuePackets(
GeneratePackets(RtpPacketMediaType::kVideo, kPacketsPerSecond));
time_controller.AdvanceTime(TimeDelta::Seconds(1));
// Insert three packets, and record send time of each of them.
// After the second packet is sent, double the send rate so we can
// check the third packets is sent after half the wait time.
Timestamp first_packet_time = Timestamp::MinusInfinity();
Timestamp second_packet_time = Timestamp::MinusInfinity();
Timestamp third_packet_time = Timestamp::MinusInfinity();
// Insert three packets, and record send time of each of them.
// After the second packet is sent, double the send rate so we can
// check the third packets is sent after half the wait time.
Timestamp first_packet_time = Timestamp::MinusInfinity();
Timestamp second_packet_time = Timestamp::MinusInfinity();
Timestamp third_packet_time = Timestamp::MinusInfinity();
EXPECT_CALL(packet_router_, SendPacket)
.Times(3)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
if (first_packet_time.IsInfinite()) {
first_packet_time = CurrentTime();
} else if (second_packet_time.IsInfinite()) {
second_packet_time = CurrentTime();
pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero());
} else {
third_packet_time = CurrentTime();
}
});
EXPECT_CALL(packet_router, SendPacket)
.Times(3)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
if (first_packet_time.IsInfinite()) {
first_packet_time = time_controller.GetClock()->CurrentTime();
} else if (second_packet_time.IsInfinite()) {
second_packet_time = time_controller.GetClock()->CurrentTime();
pacer.SetPacingRates(2 * kPacingRate, DataRate::Zero());
} else {
third_packet_time = time_controller.GetClock()->CurrentTime();
}
});
pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
time_controller_.AdvanceTime(TimeDelta::Millis(500));
ASSERT_TRUE(third_packet_time.IsFinite());
EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
1.0);
EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
1.0);
}
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 3));
time_controller.AdvanceTime(TimeDelta::Millis(500));
ASSERT_TRUE(third_packet_time.IsFinite());
EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
1.0);
EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
1.0);
}
TEST_F(TaskQueuePacedSenderTest, SendsAudioImmediately) {
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime);
pacer_.SetPacingRates(kPacingDataRate, DataRate::Zero());
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = kPacketSize / kPacingDataRate;
// Add some initial video packets, only one should be sent.
EXPECT_CALL(packet_router_, SendPacket);
pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router_);
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
// Advance time, but still before next packet should be sent.
time_controller_.AdvanceTime(kPacketPacingTime / 2);
// Add some initial video packets, only one should be sent.
EXPECT_CALL(packet_router, SendPacket);
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
time_controller.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router);
// Insert an audio packet, it should be sent immediately.
EXPECT_CALL(packet_router_, SendPacket);
pacer_.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
time_controller_.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router_);
}
// Advance time, but still before next packet should be sent.
time_controller.AdvanceTime(kPacketPacingTime / 2);
// Insert an audio packet, it should be sent immediately.
EXPECT_CALL(packet_router, SendPacket);
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kAudio, 1));
time_controller.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router);
}
TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
// Add 10 packets. The first should be sent immediately since the buffers
// are clear.
EXPECT_CALL(packet_router, SendPacket);
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
time_controller.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router);
// Advance time to 1ms before the coalescing window ends. No packets should
// be sent.
EXPECT_CALL(packet_router, SendPacket).Times(0);
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
// Advance time to where coalescing window ends. All packets that should
// have been sent up til now will be sent.
EXPECT_CALL(packet_router, SendPacket).Times(5);
time_controller.AdvanceTime(TimeDelta::Millis(1));
::testing::Mock::VerifyAndClearExpectations(&packet_router);
}
TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
const TimeDelta kPacketPacingTime = TimeDelta::Millis(1);
const DataRate kPacingDataRate = kPacketSize / kPacketPacingTime;
pacer.SetPacingRates(kPacingDataRate, DataRate::Zero());
// Add 10 packets. The first should be sent immediately since the buffers
// are clear. This will also trigger the probe to start.
EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
pacer.CreateProbeCluster(kPacingDataRate * 2, 17);
pacer.EnqueuePackets(GeneratePackets(RtpPacketMediaType::kVideo, 10));
time_controller.AdvanceTime(TimeDelta::Zero());
::testing::Mock::VerifyAndClearExpectations(&packet_router);
// Advance time to 1ms before the coalescing window ends. Packets should be
// flying.
EXPECT_CALL(packet_router, SendPacket).Times(AtLeast(1));
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
} // namespace test
} // namespace webrtc