Add support for dynamic processing mode in PacedSender.

Behind a default-disabled field trial.

Bug: webrtc:10809
Change-Id: If5d9b69721bd67e59e68b1026e3797e9a1b0a760
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/159783
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29802}
This commit is contained in:
Erik Språng
2019-11-14 18:04:49 +01:00
committed by Commit Bot
parent 02fac7d86e
commit 74f35e48d5
3 changed files with 109 additions and 57 deletions

View File

@ -32,11 +32,16 @@ PacedSender::PacedSender(Clock* clock,
RtcEventLog* event_log, RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials, const WebRtcKeyValueConfig* field_trials,
ProcessThread* process_thread) ProcessThread* process_thread)
: pacing_controller_(clock, : process_mode_((field_trials != nullptr &&
field_trials->Lookup("WebRTC-Pacer-DynamicProcess")
.find("Enabled") == 0)
? PacingController::ProcessMode::kDynamic
: PacingController::ProcessMode::kPeriodic),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this), static_cast<PacingController::PacketSender*>(this),
event_log, event_log,
field_trials, field_trials,
PacingController::ProcessMode::kPeriodic), process_mode_),
clock_(clock), clock_(clock),
packet_router_(packet_router), packet_router_(packet_router),
process_thread_(process_thread) { process_thread_(process_thread) {
@ -45,8 +50,9 @@ PacedSender::PacedSender(Clock* clock,
} }
PacedSender::~PacedSender() { PacedSender::~PacedSender() {
if (process_thread_) if (process_thread_) {
process_thread_->DeRegisterModule(&module_proxy_); process_thread_->DeRegisterModule(&module_proxy_);
}
} }
void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) { void PacedSender::CreateProbeCluster(DataRate bitrate, int cluster_id) {
@ -62,8 +68,9 @@ void PacedSender::Pause() {
// Tell the process thread to call our TimeUntilNextProcess() method to get // Tell the process thread to call our TimeUntilNextProcess() method to get
// a new (longer) estimate for when to call Process(). // a new (longer) estimate for when to call Process().
if (process_thread_) if (process_thread_) {
process_thread_->WakeUp(&module_proxy_); process_thread_->WakeUp(&module_proxy_);
}
} }
void PacedSender::Resume() { void PacedSender::Resume() {
@ -74,31 +81,44 @@ void PacedSender::Resume() {
// Tell the process thread to call our TimeUntilNextProcess() method to // Tell the process thread to call our TimeUntilNextProcess() method to
// refresh the estimate for when to call Process(). // refresh the estimate for when to call Process().
if (process_thread_) if (process_thread_) {
process_thread_->WakeUp(&module_proxy_); process_thread_->WakeUp(&module_proxy_);
}
} }
void PacedSender::SetCongestionWindow(DataSize congestion_window_size) { void PacedSender::SetCongestionWindow(DataSize congestion_window_size) {
rtc::CritScope cs(&critsect_); {
pacing_controller_.SetCongestionWindow(congestion_window_size); rtc::CritScope cs(&critsect_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
}
MaybeWakupProcessThread();
} }
void PacedSender::UpdateOutstandingData(DataSize outstanding_data) { void PacedSender::UpdateOutstandingData(DataSize outstanding_data) {
rtc::CritScope cs(&critsect_); {
pacing_controller_.UpdateOutstandingData(outstanding_data); rtc::CritScope cs(&critsect_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
}
MaybeWakupProcessThread();
} }
void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) { void PacedSender::SetPacingRates(DataRate pacing_rate, DataRate padding_rate) {
rtc::CritScope cs(&critsect_); {
pacing_controller_.SetPacingRates(pacing_rate, padding_rate); rtc::CritScope cs(&critsect_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
}
MaybeWakupProcessThread();
} }
void PacedSender::EnqueuePackets( void PacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) { std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
rtc::CritScope cs(&critsect_); {
for (auto& packet : packets) { rtc::CritScope cs(&critsect_);
pacing_controller_.EnqueuePacket(std::move(packet)); for (auto& packet : packets) {
pacing_controller_.EnqueuePacket(std::move(packet));
}
} }
MaybeWakupProcessThread();
} }
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@ -144,9 +164,21 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
RTC_DCHECK(!process_thread || process_thread == process_thread_); RTC_DCHECK(!process_thread || process_thread == process_thread_);
} }
void PacedSender::MaybeWakupProcessThread() {
// Tell the process thread to call our TimeUntilNextProcess() method to get
// a new time for when to call Process().
if (process_thread_ &&
process_mode_ == PacingController::ProcessMode::kDynamic) {
process_thread_->WakeUp(&module_proxy_);
}
}
void PacedSender::SetQueueTimeLimit(TimeDelta limit) { void PacedSender::SetQueueTimeLimit(TimeDelta limit) {
rtc::CritScope cs(&critsect_); {
pacing_controller_.SetQueueTimeLimit(limit); rtc::CritScope cs(&critsect_);
pacing_controller_.SetQueueTimeLimit(limit);
}
MaybeWakupProcessThread();
} }
void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet, void PacedSender::SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,

View File

@ -134,9 +134,10 @@ class PacedSender : public Module,
// Called when the prober is associated with a process thread. // Called when the prober is associated with a process thread.
void ProcessThreadAttached(ProcessThread* process_thread) override; void ProcessThreadAttached(ProcessThread* process_thread) override;
private: // In dynamic process mode, refreshes the next process time.
// Methods implementing PacedSenderController:PacketSender. void MaybeWakupProcessThread();
// Methods implementing PacedSenderController:PacketSender.
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet, void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override const PacedPacketInfo& cluster_info) override
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
@ -163,6 +164,7 @@ class PacedSender : public Module,
} module_proxy_{this}; } module_proxy_{this};
rtc::CriticalSection critsect_; rtc::CriticalSection critsect_;
const PacingController::ProcessMode process_mode_;
PacingController pacing_controller_ RTC_GUARDED_BY(critsect_); PacingController pacing_controller_ RTC_GUARDED_BY(critsect_);
Clock* const clock_; Clock* const clock_;

View File

@ -39,7 +39,6 @@ constexpr size_t kDefaultPacketSize = 234;
namespace webrtc { namespace webrtc {
namespace test { namespace test {
// Mock callback implementing the raw api. // Mock callback implementing the raw api.
class MockCallback : public PacketRouter { class MockCallback : public PacketRouter {
public: public:
@ -51,69 +50,88 @@ class MockCallback : public PacketRouter {
std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes)); std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
}; };
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) { class PacedSenderTest
auto packet = std::make_unique<RtpPacketToSend>(nullptr); : public ::testing::TestWithParam<PacingController::ProcessMode> {
packet->set_packet_type(type); public:
switch (type) { PacedSenderTest() : clock_(0), paced_module_(nullptr) {}
case RtpPacketToSend::Type::kAudio:
packet->SetSsrc(kAudioSsrc); void SetUp() override {
break; EXPECT_CALL(process_thread_, RegisterModule)
case RtpPacketToSend::Type::kVideo: .WillOnce(SaveArg<0>(&paced_module_));
packet->SetSsrc(kVideoSsrc);
break; pacer_ = std::make_unique<PacedSender>(&clock_, &callback_, nullptr,
case RtpPacketToSend::Type::kRetransmission: nullptr, &process_thread_);
case RtpPacketToSend::Type::kPadding: EXPECT_CALL(process_thread_, DeRegisterModule(paced_module_)).Times(1);
packet->SetSsrc(kVideoRtxSsrc);
break;
case RtpPacketToSend::Type::kForwardErrorCorrection:
packet->SetSsrc(kFlexFecSsrc);
break;
} }
packet->SetPayloadSize(kDefaultPacketSize); protected:
return packet; std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
} auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
switch (type) {
case RtpPacketToSend::Type::kAudio:
packet->SetSsrc(kAudioSsrc);
break;
case RtpPacketToSend::Type::kVideo:
packet->SetSsrc(kVideoSsrc);
break;
case RtpPacketToSend::Type::kRetransmission:
case RtpPacketToSend::Type::kPadding:
packet->SetSsrc(kVideoRtxSsrc);
break;
case RtpPacketToSend::Type::kForwardErrorCorrection:
packet->SetSsrc(kFlexFecSsrc);
break;
}
TEST(PacedSenderTest, PacesPackets) { packet->SetPayloadSize(kDefaultPacketSize);
SimulatedClock clock(0); return packet;
MockCallback callback; }
MockProcessThread process_thread;
Module* paced_module = nullptr;
EXPECT_CALL(process_thread, RegisterModule(_, _))
.WillOnce(SaveArg<0>(&paced_module));
PacedSender pacer(&clock, &callback, nullptr, nullptr, &process_thread);
EXPECT_CALL(process_thread, DeRegisterModule(paced_module)).Times(1);
SimulatedClock clock_;
MockCallback callback_;
MockProcessThread process_thread_;
Module* paced_module_;
std::unique_ptr<PacedSender> pacer_;
};
TEST_P(PacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second. // Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42; static constexpr size_t kPacketsToSend = 42;
pacer.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend), pacer_->SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero()); DataRate::Zero());
std::vector<std::unique_ptr<RtpPacketToSend>> packets; std::vector<std::unique_ptr<RtpPacketToSend>> packets;
for (size_t i = 0; i < kPacketsToSend; ++i) { for (size_t i = 0; i < kPacketsToSend; ++i) {
packets.emplace_back(BuildRtpPacket(RtpPacketToSend::Type::kVideo)); packets.emplace_back(BuildRtpPacket(RtpPacketToSend::Type::kVideo));
} }
pacer.EnqueuePackets(std::move(packets)); pacer_->EnqueuePackets(std::move(packets));
// Expect all of them to be sent. // Expect all of them to be sent.
size_t packets_sent = 0; size_t packets_sent = 0;
clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
EXPECT_CALL(callback, SendPacket) EXPECT_CALL(callback_, SendPacket)
.WillRepeatedly( .WillRepeatedly(
[&](std::unique_ptr<RtpPacketToSend> packet, [&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) { ++packets_sent; }); const PacedPacketInfo& cluster_info) { ++packets_sent; });
const Timestamp start_time = clock.CurrentTime(); const Timestamp start_time = clock_.CurrentTime();
while (packets_sent < kPacketsToSend) { while (packets_sent < kPacketsToSend) {
clock.AdvanceTimeMilliseconds(paced_module->TimeUntilNextProcess()); clock_.AdvanceTimeMilliseconds(paced_module_->TimeUntilNextProcess());
paced_module->Process(); paced_module_->Process();
} }
// Packets should be sent over a period of close to 1s. Expect a little lower // Packets should be sent over a period of close to 1s. Expect a little lower
// than this since initial probing is a bit quicker. // than this since initial probing is a bit quicker.
TimeDelta duration = clock.CurrentTime() - start_time; TimeDelta duration = clock_.CurrentTime() - start_time;
EXPECT_GT(duration, TimeDelta::ms(900)); EXPECT_GT(duration, TimeDelta::ms(900));
} }
INSTANTIATE_TEST_SUITE_P(
WithAndWithoutDynamicProcess,
PacedSenderTest,
::testing::Values(PacingController::ProcessMode::kPeriodic,
PacingController::ProcessMode::kDynamic));
} // namespace test } // namespace test
} // namespace webrtc } // namespace webrtc