Implements a task-queue based PacedSender, wires it up for field trials

Bug: webrtc:10809
Change-Id: Ia181c16559f4598f32dd399c24802d0a289e250b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150942
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29946}
This commit is contained in:
Erik Språng
2019-11-26 17:48:49 +01:00
committed by Commit Bot
parent 5314b13a8d
commit 4314a494cf
8 changed files with 656 additions and 18 deletions

View File

@ -72,9 +72,24 @@ RtpTransportControllerSend::RtpTransportControllerSend(
const WebRtcKeyValueConfig* trials)
: clock_(clock),
event_log_(event_log),
field_trials_(trials ? trials : &fallback_field_trials_),
bitrate_configurator_(bitrate_config),
process_thread_(std::move(process_thread)),
pacer_(clock, &packet_router_, event_log, trials, process_thread_.get()),
use_task_queue_pacer_(IsEnabled(field_trials_, "WebRTC-TaskQueuePacer")),
process_thread_pacer_(use_task_queue_pacer_
? nullptr
: new PacedSender(clock,
&packet_router_,
event_log,
field_trials_,
process_thread_.get())),
task_queue_pacer_(use_task_queue_pacer_
? new TaskQueuePacedSender(clock,
&packet_router_,
event_log,
field_trials_,
task_queue_factory)
: nullptr),
observer_(nullptr),
controller_factory_override_(controller_factory),
controller_factory_fallback_(
@ -82,11 +97,12 @@ RtpTransportControllerSend::RtpTransportControllerSend(
process_interval_(controller_factory_fallback_->GetProcessInterval()),
last_report_block_time_(Timestamp::ms(clock_->TimeInMilliseconds())),
reset_feedback_on_route_change_(
!IsEnabled(trials, "WebRTC-Bwe-NoFeedbackReset")),
!IsEnabled(field_trials_, "WebRTC-Bwe-NoFeedbackReset")),
send_side_bwe_with_overhead_(
IsEnabled(trials, "WebRTC-SendSideBwe-WithOverhead")),
IsEnabled(field_trials_, "WebRTC-SendSideBwe-WithOverhead")),
add_pacing_to_cwin_(
IsEnabled(trials, "WebRTC-AddPacingToCongestionWindowPushback")),
IsEnabled(field_trials_,
"WebRTC-AddPacingToCongestionWindowPushback")),
transport_overhead_bytes_per_packet_(0),
network_available_(false),
retransmission_rate_limiter_(clock, kRetransmitWindowSizeMs),
@ -95,18 +111,22 @@ RtpTransportControllerSend::RtpTransportControllerSend(
TaskQueueFactory::Priority::NORMAL)) {
initial_config_.constraints = ConvertConstraints(bitrate_config, clock_);
initial_config_.event_log = event_log;
initial_config_.key_value_config = &trial_based_config_;
initial_config_.key_value_config = field_trials_;
RTC_DCHECK(bitrate_config.start_bitrate_bps > 0);
pacer()->SetPacingRates(DataRate::bps(bitrate_config.start_bitrate_bps),
DataRate::Zero());
if (!use_task_queue_pacer_) {
process_thread_->Start();
}
}
RtpTransportControllerSend::~RtpTransportControllerSend() {
if (!use_task_queue_pacer_) {
process_thread_->Stop();
}
}
RtpVideoSenderInterface* RtpTransportControllerSend::CreateRtpVideoSender(
std::map<uint32_t, RtpState> suspended_ssrcs,
@ -153,15 +173,17 @@ void RtpTransportControllerSend::UpdateControlState() {
}
RtpPacketPacer* RtpTransportControllerSend::pacer() {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
if (use_task_queue_pacer_) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
const RtpPacketPacer* RtpTransportControllerSend::pacer() const {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
if (use_task_queue_pacer_) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
rtc::TaskQueue* RtpTransportControllerSend::GetWorkerQueue() {
@ -183,9 +205,10 @@ RtpTransportControllerSend::transport_feedback_observer() {
}
RtpPacketSender* RtpTransportControllerSend::packet_sender() {
// TODO(bugs.webrtc.org/10809): Return reference to the correct
// pacer implementation.
return &pacer_;
if (use_task_queue_pacer_) {
return task_queue_pacer_.get();
}
return process_thread_pacer_.get();
}
void RtpTransportControllerSend::SetAllocatedSendBitrateLimits(

View File

@ -24,8 +24,10 @@
#include "call/rtp_video_sender.h"
#include "modules/congestion_controller/rtp/control_handler.h"
#include "modules/congestion_controller/rtp/transport_feedback_adapter.h"
#include "modules/pacing/paced_sender.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/pacing/task_queue_paced_sender.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/constructor_magic.h"
#include "rtc_base/network_route.h"
@ -137,13 +139,17 @@ class RtpTransportControllerSend final
Clock* const clock_;
RtcEventLog* const event_log_;
const FieldTrialBasedConfig trial_based_config_;
// TODO(sprang): Remove fallback field-trials.
const FieldTrialBasedConfig fallback_field_trials_;
const WebRtcKeyValueConfig* field_trials_;
PacketRouter packet_router_;
std::vector<std::unique_ptr<RtpVideoSenderInterface>> video_rtp_senders_;
RtpBitrateConfigurator bitrate_configurator_;
std::map<std::string, rtc::NetworkRoute> network_routes_;
const std::unique_ptr<ProcessThread> process_thread_;
PacedSender pacer_;
const bool use_task_queue_pacer_;
std::unique_ptr<PacedSender> process_thread_pacer_;
std::unique_ptr<TaskQueuePacedSender> task_queue_pacer_;
TargetTransferRateObserver* observer_ RTC_GUARDED_BY(task_queue_);

View File

@ -26,6 +26,8 @@ rtc_library("pacing") {
"round_robin_packet_queue.cc",
"round_robin_packet_queue.h",
"rtp_packet_pacer.h",
"task_queue_paced_sender.cc",
"task_queue_paced_sender.h",
]
deps = [
@ -33,6 +35,7 @@ rtc_library("pacing") {
"..:module_api",
"../../api:function_view",
"../../api/rtc_event_log",
"../../api/task_queue:task_queue",
"../../api/transport:field_trial_based_config",
"../../api/transport:network_control",
"../../api/transport:webrtc_key_value_config",
@ -44,7 +47,10 @@ rtc_library("pacing") {
"../../logging:rtc_event_pacing",
"../../rtc_base:checks",
"../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_task_queue",
"../../rtc_base/experiments:field_trial_parser",
"../../rtc_base/synchronization:sequence_checker",
"../../rtc_base/task_utils:to_queued_task",
"../../system_wrappers",
"../../system_wrappers:metrics",
"../remote_bitrate_estimator",
@ -78,6 +84,7 @@ if (rtc_include_tests) {
"paced_sender_unittest.cc",
"pacing_controller_unittest.cc",
"packet_router_unittest.cc",
"task_queue_paced_sender_unittest.cc",
]
deps = [
":interval_budget",
@ -93,6 +100,7 @@ if (rtc_include_tests) {
"../../system_wrappers:field_trial",
"../../test:field_trial",
"../../test:test_support",
"../../test/time_controller:time_controller",
"../rtp_rtcp",
"../rtp_rtcp:mock_rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format",

View File

@ -241,6 +241,10 @@ DataSize PacingController::QueueSizeData() const {
return packet_queue_.Size();
}
DataSize PacingController::CurrentBufferLevel() const {
return std::max(media_debt_, padding_debt_);
}
absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
return first_sent_packet_time_;
}

View File

@ -111,9 +111,14 @@ class PacingController {
// Returns the time since the oldest queued packet was enqueued.
TimeDelta OldestPacketWaitTime() const;
// Number of packets in the pacer queue.
size_t QueueSizePackets() const;
// Totals size of packets in the pacer queue.
DataSize QueueSizeData() const;
// Current buffer level, i.e. max of media and padding debt.
DataSize CurrentBufferLevel() const;
// Returns the time when the first packet was sent;
absl::optional<Timestamp> FirstSentPacketTime() const;

View File

@ -0,0 +1,254 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/task_queue_paced_sender.h"
#include <algorithm>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
namespace webrtc {
namespace {
// If no calls to MaybeProcessPackets() happen, make sure we update stats
// at least every |kMaxTimeBetweenStatsUpdates| as long as the pacer isn't
// completely drained.
constexpr TimeDelta kMaxTimeBetweenStatsUpdates = TimeDelta::Millis<33>();
// Don't call UpdateStats() more than |kMinTimeBetweenStatsUpdates| apart,
// for performance reasons.
constexpr TimeDelta kMinTimeBetweenStatsUpdates = TimeDelta::Millis<1>();
} // namespace
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory)
: clock_(clock),
packet_router_(packet_router),
pacing_controller_(clock,
static_cast<PacingController::PacketSender*>(this),
event_log,
field_trials,
PacingController::ProcessMode::kDynamic),
next_process_time_(Timestamp::MinusInfinity()),
stats_update_scheduled_(false),
last_stats_time_(Timestamp::MinusInfinity()),
is_shutdown_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {}
TaskQueuePacedSender::~TaskQueuePacedSender() {
// Post an immediate task to mark the queue as shutting down.
// The rtc::TaskQueue destructor will wait for pending tasks to
// complete before continuing.
task_queue_.PostTask([&]() {
RTC_DCHECK_RUN_ON(&task_queue_);
is_shutdown_ = true;
});
}
void TaskQueuePacedSender::CreateProbeCluster(DataRate bitrate,
int cluster_id) {
task_queue_.PostTask([this, bitrate, cluster_id]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.CreateProbeCluster(bitrate, cluster_id);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::Pause() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Pause();
});
}
void TaskQueuePacedSender::Resume() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.Resume();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetCongestionWindow(
DataSize congestion_window_size) {
task_queue_.PostTask([this, congestion_window_size]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetCongestionWindow(congestion_window_size);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::UpdateOutstandingData(DataSize outstanding_data) {
if (task_queue_.IsCurrent()) {
RTC_DCHECK_RUN_ON(&task_queue_);
// Fast path since this can be called once per sent packet while on the
// task queue.
pacing_controller_.UpdateOutstandingData(outstanding_data);
return;
}
task_queue_.PostTask([this, outstanding_data]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.UpdateOutstandingData(outstanding_data);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetPacingRates(DataRate pacing_rate,
DataRate padding_rate) {
task_queue_.PostTask([this, pacing_rate, padding_rate]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetPacingRates(pacing_rate, padding_rate);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) {
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
pacing_controller_.EnqueuePacket(std::move(packet));
}
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
task_queue_.PostTask([this, account_for_audio]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetAccountForAudioPackets(account_for_audio);
});
}
void TaskQueuePacedSender::SetQueueTimeLimit(TimeDelta limit) {
task_queue_.PostTask([this, limit]() {
RTC_DCHECK_RUN_ON(&task_queue_);
pacing_controller_.SetQueueTimeLimit(limit);
MaybeProcessPackets(Timestamp::MinusInfinity());
});
}
TimeDelta TaskQueuePacedSender::ExpectedQueueTime() const {
return GetStats().expected_queue_time;
}
DataSize TaskQueuePacedSender::QueueSizeData() const {
return GetStats().queue_size;
}
absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
return GetStats().first_sent_packet_time;
}
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
return GetStats().oldest_packet_wait_time;
}
void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(&task_queue_);
if (is_shutdown_) {
return;
}
const Timestamp now = clock_->CurrentTime();
// Run ProcessPackets() only if this is the schedules task, or if there is
// no scheduled task and we need to process immediately.
if ((scheduled_process_time.IsFinite() &&
scheduled_process_time == next_process_time_) ||
(next_process_time_.IsInfinite() &&
pacing_controller_.NextSendTime() <= now)) {
pacing_controller_.ProcessPackets();
next_process_time_ = Timestamp::MinusInfinity();
}
Timestamp next_process_time = std::max(now + PacingController::kMinSleepTime,
pacing_controller_.NextSendTime());
TimeDelta sleep_time = next_process_time - now;
if (next_process_time_.IsMinusInfinity() ||
next_process_time <=
next_process_time_ - PacingController::kMinSleepTime) {
next_process_time_ = next_process_time;
task_queue_.PostDelayedTask(
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
sleep_time.ms<uint32_t>());
}
MaybeUpdateStats(false);
}
std::vector<std::unique_ptr<RtpPacketToSend>>
TaskQueuePacedSender::GeneratePadding(DataSize size) {
return packet_router_->GeneratePadding(size.bytes());
}
void TaskQueuePacedSender::SendRtpPacket(
std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
packet_router_->SendPacket(std::move(packet), cluster_info);
}
void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
if (is_shutdown_) {
return;
}
Timestamp now = clock_->CurrentTime();
if (!is_scheduled_call &&
now - last_stats_time_ < kMinTimeBetweenStatsUpdates) {
// Too frequent unscheduled stats update, return early.
return;
}
rtc::CritScope cs(&stats_crit_);
current_stats_.expected_queue_time = pacing_controller_.ExpectedQueueTime();
current_stats_.first_sent_packet_time =
pacing_controller_.FirstSentPacketTime();
current_stats_.oldest_packet_wait_time =
pacing_controller_.OldestPacketWaitTime();
current_stats_.queue_size = pacing_controller_.QueueSizeData();
last_stats_time_ = now;
bool pacer_drained = pacing_controller_.QueueSizePackets() == 0 &&
pacing_controller_.CurrentBufferLevel().IsZero();
// If there's anything interesting to get from the pacer and this is a
// scheduled call (no scheduled call in flight), post a new scheduled stats
// update.
if (!pacer_drained && (is_scheduled_call || !stats_update_scheduled_)) {
task_queue_.PostDelayedTask(
[this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
MaybeUpdateStats(true);
},
kMaxTimeBetweenStatsUpdates.ms<uint32_t>());
stats_update_scheduled_ = true;
} else {
stats_update_scheduled_ = false;
}
}
TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
rtc::CritScope cs(&stats_crit_);
return current_stats_;
}
} // namespace webrtc

View File

@ -0,0 +1,162 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
#define MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <memory>
#include <queue>
#include <vector>
#include "absl/types/optional.h"
#include "api/task_queue/task_queue_factory.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/include/module.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/pacing/packet_router.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
class Clock;
class RtcEventLog;
class TaskQueuePacedSender : public RtpPacketPacer,
public RtpPacketSender,
private PacingController::PacketSender {
public:
TaskQueuePacedSender(Clock* clock,
PacketRouter* packet_router,
RtcEventLog* event_log,
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory);
~TaskQueuePacedSender() override;
// Methods implementing RtpPacketSender.
// Adds the packet to the queue and calls PacketRouter::SendPacket() when
// it's time to send.
void EnqueuePackets(
std::vector<std::unique_ptr<RtpPacketToSend>> packets) override;
// Methods implementing RtpPacketPacer:
void CreateProbeCluster(DataRate bitrate, int cluster_id) override;
// Temporarily pause all sending.
void Pause() override;
// Resume sending packets.
void Resume() override;
void SetCongestionWindow(DataSize congestion_window_size) override;
void UpdateOutstandingData(DataSize outstanding_data) override;
// Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(DataRate pacing_rate, DataRate padding_rate) override;
// Currently audio traffic is not accounted for by pacer and passed through.
// With the introduction of audio BWE, audio traffic will be accounted for
// in the pacer budget calculation. The audio traffic will still be injected
// at high priority.
void SetAccountForAudioPackets(bool account_for_audio) override;
// Returns the time since the oldest queued packet was enqueued.
TimeDelta OldestPacketWaitTime() const override;
// Returns total size of all packets in the pacer queue.
DataSize QueueSizeData() const override;
// Returns the time when the first packet was sent;
absl::optional<Timestamp> FirstSentPacketTime() const override;
// Returns the number of milliseconds it will take to send the current
// packets in the queue, given the current size and bitrate, ignoring prio.
TimeDelta ExpectedQueueTime() const override;
// Set the max desired queuing delay, pacer will override the pacing rate
// specified by SetPacingRates() if needed to achieve this goal.
void SetQueueTimeLimit(TimeDelta limit) override;
private:
struct Stats {
Stats()
: oldest_packet_wait_time(TimeDelta::Zero()),
queue_size(DataSize::Zero()),
expected_queue_time(TimeDelta::Zero()) {}
TimeDelta oldest_packet_wait_time;
DataSize queue_size;
TimeDelta expected_queue_time;
absl::optional<Timestamp> first_sent_packet_time;
};
// Check if it is time to send packets, or schedule a delayed task if not.
// Use Timestamp::MinusInfinity() to indicate that this call has _not_
// been scheduled by the pacing controller. If this is the case, check if
// can execute immediately otherwise schedule a delay task that calls this
// method again with desired (finite) scheduled process time.
void MaybeProcessPackets(Timestamp scheduled_process_time);
// Methods implementing PacedSenderController:PacketSender.
void SendRtpPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) override
RTC_RUN_ON(task_queue_);
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePadding(
DataSize size) override RTC_RUN_ON(task_queue_);
void MaybeUpdateStats(bool is_scheduled_call) RTC_RUN_ON(task_queue_);
Stats GetStats() const;
Clock* const clock_;
PacketRouter* const packet_router_ RTC_GUARDED_BY(task_queue_);
PacingController pacing_controller_ RTC_GUARDED_BY(task_queue_);
// We want only one (valid) delayed process task in flight at a time.
// If the value of |next_process_time_| is finite, it is an id for a
// delayed task that will call MaybeProcessPackets() with that time
// as parameter.
// Timestamp::MinusInfinity() indicates no valid pending task.
Timestamp next_process_time_ RTC_GUARDED_BY(task_queue_);
// Since we don't want to support synchronous calls that wait for a
// task execution, we poll the stats at some interval and update
// |current_stats_|, which can in turn be polled at any time.
// True iff there is delayed task in flight that that will call
// UdpateStats().
bool stats_update_scheduled_ RTC_GUARDED_BY(task_queue_);
// Last time stats were updated.
Timestamp last_stats_time_ RTC_GUARDED_BY(task_queue_);
// Indicates if this task queue is shutting down. If so, don't allow
// posting any more delayed tasks as that can cause the task queue to
// never drain.
bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
rtc::CriticalSection stats_crit_;
Stats current_stats_ RTC_GUARDED_BY(stats_crit_);
rtc::TaskQueue task_queue_;
};
} // namespace webrtc
#endif // MODULES_PACING_TASK_QUEUE_PACED_SENDER_H_

View File

@ -0,0 +1,176 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/task_queue_paced_sender.h"
#include <list>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "modules/pacing/packet_router.h"
#include "modules/utility/include/mock/mock_process_thread.h"
#include "test/field_trial.h"
#include "test/gmock.h"
#include "test/gtest.h"
#include "test/time_controller/simulated_time_controller.h"
using ::testing::_;
using ::testing::Return;
using ::testing::SaveArg;
namespace webrtc {
namespace {
constexpr uint32_t kAudioSsrc = 12345;
constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234;
class MockPacketRouter : public PacketRouter {
public:
MOCK_METHOD2(SendPacket,
void(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info));
MOCK_METHOD1(
GeneratePadding,
std::vector<std::unique_ptr<RtpPacketToSend>>(size_t target_size_bytes));
};
} // namespace
namespace test {
class TaskQueuePacedSenderTest : public ::testing::Test {
public:
TaskQueuePacedSenderTest()
: time_controller_(Timestamp::ms(1234)),
pacer_(time_controller_.GetClock(),
&packet_router_,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller_.GetTaskQueueFactory()) {}
protected:
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
auto packet = std::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
switch (type) {
case RtpPacketToSend::Type::kAudio:
packet->SetSsrc(kAudioSsrc);
break;
case RtpPacketToSend::Type::kVideo:
packet->SetSsrc(kVideoSsrc);
break;
case RtpPacketToSend::Type::kRetransmission:
case RtpPacketToSend::Type::kPadding:
packet->SetSsrc(kVideoRtxSsrc);
break;
case RtpPacketToSend::Type::kForwardErrorCorrection:
packet->SetSsrc(kFlexFecSsrc);
break;
}
packet->SetPayloadSize(kDefaultPacketSize);
return packet;
}
std::vector<std::unique_ptr<RtpPacketToSend>> GeneratePackets(
RtpPacketToSend::Type type,
size_t num_packets) {
std::vector<std::unique_ptr<RtpPacketToSend>> packets;
for (size_t i = 0; i < num_packets; ++i) {
packets.push_back(BuildRtpPacket(type));
}
return packets;
}
Timestamp CurrentTime() { return time_controller_.GetClock()->CurrentTime(); }
GlobalSimulatedTimeController time_controller_;
MockPacketRouter packet_router_;
TaskQueuePacedSender pacer_;
};
TEST_F(TaskQueuePacedSenderTest, PacesPackets) {
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
pacer_.SetPacingRates(DataRate::bps(kDefaultPacketSize * 8 * kPacketsToSend),
DataRate::Zero());
pacer_.EnqueuePackets(
GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsToSend));
// Expect all of them to be sent.
size_t packets_sent = 0;
Timestamp end_time = Timestamp::PlusInfinity();
EXPECT_CALL(packet_router_, SendPacket)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
++packets_sent;
if (packets_sent == kPacketsToSend) {
end_time = time_controller_.GetClock()->CurrentTime();
}
});
const Timestamp start_time = time_controller_.GetClock()->CurrentTime();
// Packets should be sent over a period of close to 1s. Expect a little lower
// than this since initial probing is a bit quicker.
time_controller_.Sleep(TimeDelta::seconds(1));
EXPECT_EQ(packets_sent, kPacketsToSend);
ASSERT_TRUE(end_time.IsFinite());
EXPECT_NEAR((end_time - start_time).ms<double>(), 1000.0, 50.0);
}
TEST_F(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
const DataRate kPacingRate =
DataRate::bps(kDefaultPacketSize * 8 * kPacketsPerSecond);
pacer_.SetPacingRates(kPacingRate, DataRate::Zero());
// Send some initial packets to be rid of any probes.
EXPECT_CALL(packet_router_, SendPacket).Times(kPacketsPerSecond);
pacer_.EnqueuePackets(
GeneratePackets(RtpPacketToSend::Type::kVideo, kPacketsPerSecond));
time_controller_.Sleep(TimeDelta::seconds(1));
// Insert three packets, and record send time of each of them.
// After the second packet is sent, double the send rate so we can
// check the third packets is sent after half the wait time.
Timestamp first_packet_time = Timestamp::MinusInfinity();
Timestamp second_packet_time = Timestamp::MinusInfinity();
Timestamp third_packet_time = Timestamp::MinusInfinity();
EXPECT_CALL(packet_router_, SendPacket)
.Times(3)
.WillRepeatedly([&](std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
if (first_packet_time.IsInfinite()) {
first_packet_time = CurrentTime();
} else if (second_packet_time.IsInfinite()) {
second_packet_time = CurrentTime();
pacer_.SetPacingRates(2 * kPacingRate, DataRate::Zero());
} else {
third_packet_time = CurrentTime();
}
});
pacer_.EnqueuePackets(GeneratePackets(RtpPacketToSend::Type::kVideo, 3));
time_controller_.Sleep(TimeDelta::ms(500));
ASSERT_TRUE(third_packet_time.IsFinite());
EXPECT_NEAR((second_packet_time - first_packet_time).ms<double>(), 200.0,
1.0);
EXPECT_NEAR((third_packet_time - second_packet_time).ms<double>(), 100.0,
1.0);
}
} // namespace test
} // namespace webrtc