Pacer: Reduce TQ wake up and improve packet size estimation

The TQ Pacer schedules delayed task according to target time of
PacingController. It drains all valid ProcessPackets() in single loop,
denies retired scheduled tasks, and round up the timeout to 1ms.

This CL also improves packet size estimation in TQ Pacer by removing
zero initialization, and introduces `include_overhead_` configuration.

Tests:
1. webrtc_perf_tests: MaybeProcessPackets() calls
  2075147 -> 2007995

2. module_unittests: MaybeProcessPackets() calls
  203393 -> 183563

3. peerconnection_unittests: MaybeProcessPackets() calls
  66713-> 64333

Bug: webrtc:13417, webrtc:13437
Change-Id: I18eb0a36dbe063c606b1f27014df74a65ebfc486
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/242962
Reviewed-by: Erik Språng <sprang@webrtc.org>
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36179}
This commit is contained in:
Jianhui Dai
2022-03-07 20:21:06 +08:00
committed by WebRTC LUCI CQ
parent b7ba602765
commit 37195cf2e5
6 changed files with 232 additions and 260 deletions

View File

@ -79,7 +79,8 @@ RtpTransportControllerSend::PacerSettings::PacerSettings(
const WebRtcKeyValueConfig* trials)
: tq_disabled("Disabled"),
holdback_window("holdback_window", PacingController::kMinSleepTime),
holdback_packets("holdback_packets", -1) {
holdback_packets("holdback_packets",
TaskQueuePacedSender::kNoPacketHoldback) {
ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
trials->Lookup("WebRTC-TaskQueuePacer"));
}

View File

@ -38,11 +38,6 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2);
// time. Applies only to periodic mode.
constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30);
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the nearest
// millisecond.
constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1);
constexpr int kFirstPriority = 0;
bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
@ -94,6 +89,8 @@ const float PacingController::kDefaultPaceMultiplier = 2.5f;
const TimeDelta PacingController::kPausedProcessInterval =
kCongestedPacketInterval;
const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1);
const TimeDelta PacingController::kMaxEarlyProbeProcessing =
TimeDelta::Millis(1);
PacingController::PacingController(Clock* clock,
PacketSender* packet_sender,
@ -133,7 +130,7 @@ PacingController::PacingController(Clock* clock,
packet_counter_(0),
congestion_window_size_(DataSize::PlusInfinity()),
outstanding_data_(DataSize::Zero()),
queue_time_limit(kMaxExpectedQueueLength),
queue_time_limit_(kMaxExpectedQueueLength),
account_for_audio_(false),
include_overhead_(false) {
if (!drain_large_queues_) {
@ -224,6 +221,7 @@ void PacingController::SetPacingRates(DataRate pacing_rate,
media_rate_ = pacing_rate;
padding_rate_ = padding_rate;
pacing_bitrate_ = pacing_rate;
media_budget_.set_target_rate_kbps(pacing_rate.kbps());
padding_budget_.set_target_rate_kbps(padding_rate.kbps());
RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
@ -302,10 +300,7 @@ void PacingController::EnqueuePacketInternal(
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_process_time);
UpdateBudgetWithElapsedTime(elapsed_time);
last_process_time_ = target_process_time;
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
@ -316,7 +311,6 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
if (last_process_time_.IsMinusInfinity() || now < last_process_time_) {
return TimeDelta::Zero();
}
RTC_DCHECK_GE(now, last_process_time_);
TimeDelta elapsed_time = now - last_process_time_;
last_process_time_ = now;
if (elapsed_time > kMaxElapsedTime) {
@ -333,8 +327,7 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
packet_counter_ == 0) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
TimeDelta elapsed_since_last_send = now - last_send_time_;
if (elapsed_since_last_send >= kCongestedPacketInterval) {
if (now - last_send_time_ >= kCongestedPacketInterval) {
return true;
}
}
@ -343,17 +336,17 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
Timestamp PacingController::NextSendTime() const {
const Timestamp now = CurrentTime();
Timestamp next_send_time = Timestamp::PlusInfinity();
if (paused_) {
return last_send_time_ + kPausedProcessInterval;
}
// If probing is active, that always takes priority.
if (prober_.is_probing()) {
if (prober_.is_probing() && !probing_send_failure_) {
Timestamp probe_time = prober_.NextProbeTime(now);
// `probe_time` == PlusInfinity indicates no probe scheduled.
if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) {
return probe_time;
if (!probe_time.IsPlusInfinity()) {
return probe_time.IsMinusInfinity() ? now : probe_time;
}
}
@ -365,86 +358,53 @@ Timestamp PacingController::NextSendTime() const {
// In dynamic mode, figure out when the next packet should be sent,
// given the current conditions.
if (!pace_audio_) {
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> audio_enqueue_time =
packet_queue_.LeadingAudioPacketEnqueueTime();
if (audio_enqueue_time.has_value()) {
return *audio_enqueue_time;
}
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> unpaced_audio_time =
pace_audio_ ? absl::nullopt
: packet_queue_.LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time) {
return *unpaced_audio_time;
}
// We need to at least send keep-alive packets with some interval.
if (Congested() || packet_counter_ == 0) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
// Check how long until we can send the next media packet.
if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + media_debt_ / media_rate_);
}
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// Check how long until we can send the next media packet.
next_send_time = last_process_time_ + media_debt_ / media_rate_;
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
RTC_DCHECK_GT(media_rate_, DataRate::Zero());
TimeDelta drain_time =
std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + drain_time);
next_send_time = last_process_time_ + drain_time;
} else {
// Nothing to do.
next_send_time = last_process_time_ + kPausedProcessInterval;
}
if (send_padding_if_silent_) {
return last_send_time_ + kPausedProcessInterval;
next_send_time =
std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
}
return last_process_time_ + kPausedProcessInterval;
return next_send_time;
}
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
Timestamp target_send_time = now;
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
if (target_send_time.IsMinusInfinity()) {
target_send_time = now;
} else if (now < target_send_time - early_execute_margin) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
return;
}
if (target_send_time < last_process_time_) {
// After the last process call, at time X, the target send time
// shifted to be earlier than X. This should normally not happen
// but we want to make sure rounding errors or erratic behavior
// of NextSendTime() does not cause issue. In particular, if the
// buffer reduction of
// rate * (target_send_time - previous_process_time)
// in the main loop doesn't clean up the existing debt we may not
// be able to send again. We don't want to check this reordering
// there as it is the normal exit condtion when the buffer is
// exhausted and there are packets in the queue.
UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
target_send_time = last_process_time_;
}
}
Timestamp previous_process_time = last_process_time_;
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
if (ShouldSendKeepalive(now)) {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ == 0) {
last_send_time_ = now;
} else {
DataSize keepalive_data_sent = DataSize::Zero();
if (packet_counter_ > 0) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
@ -455,14 +415,29 @@ void PacingController::ProcessPackets() {
EnqueuePacket(std::move(packet));
}
}
OnPaddingSent(keepalive_data_sent);
}
OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now);
}
if (paused_) {
return;
}
if (mode_ == ProcessMode::kDynamic) {
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
target_send_time = NextSendTime();
if (now + early_execute_margin < target_send_time) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
return;
}
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
@ -474,7 +449,7 @@ void PacingController::ProcessPackets() {
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit - packet_queue_.AverageQueueTime());
queue_time_limit_ - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
@ -489,13 +464,12 @@ void PacingController::ProcessPackets() {
// up to (process interval duration) * (target rate), so we only need to
// update it once before the packet sending loop.
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
} else {
media_rate_ = target_rate;
}
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
@ -504,9 +478,23 @@ void PacingController::ProcessPackets() {
// use actual send time rather than target.
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
if (pacing_info.probe_cluster_bytes_sent == 0) {
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
}
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
@ -514,102 +502,74 @@ void PacingController::ProcessPackets() {
}
DataSize data_sent = DataSize::Zero();
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
if (first_packet_in_probe) {
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
first_packet_in_probe = false;
}
if (mode_ == ProcessMode::kDynamic &&
previous_process_time < target_send_time) {
// Reduce buffer levels with amount corresponding to time between last
// process and target send time for the next packet.
// If the process call is late, that may be the time between the optimal
// send times for two packets we should already have sent.
UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
previous_process_time = target_send_time;
}
// Fetch the next packet, so long as queue is not empty or budget is not
while (true) {
// Fetch packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
if (!padding_packets.empty()) {
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
} else {
// Can't generate padding, still update padding budget for next send
// time.
UpdatePaddingBudgetWithSentData(padding_to_add);
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
}
} else {
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// Send done, update send time.
OnPacketSent(packet_type, packet_size, now);
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
if (mode_ == ProcessMode::kDynamic) {
// Update target send time in case that are more packets that we are late
// in processing.
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsMinusInfinity()) {
target_send_time = now;
} else {
target_send_time = std::min(now, next_send_time);
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
if (target_send_time > now) {
// Exit loop if not probing.
if (!is_probing) {
break;
}
target_send_time = now;
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
}
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
@ -631,8 +591,8 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
}
if (packet_counter_ == 0) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
return DataSize::Zero();
}
@ -697,25 +657,16 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time) {
if (!first_sent_packet_time_) {
if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) {
first_sent_packet_time_ = send_time;
}
bool audio_packet = packet_type == RtpPacketMediaType::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) {
UpdateBudgetWithSentData(packet_size);
}
last_send_time_ = send_time;
last_process_time_ = send_time;
}
void PacingController::OnPaddingSent(DataSize data_sent) {
if (data_sent > DataSize::Zero()) {
UpdateBudgetWithSentData(data_sent);
}
Timestamp now = CurrentTime();
last_send_time_ = now;
last_process_time_ = now;
last_send_time_ = send_time;
}
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
@ -733,17 +684,24 @@ void PacingController::UpdateBudgetWithSentData(DataSize size) {
outstanding_data_ += size;
if (mode_ == ProcessMode::kPeriodic) {
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());
} else {
media_debt_ += size;
media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime);
}
UpdatePaddingBudgetWithSentData(size);
}
void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) {
if (mode_ == ProcessMode::kPeriodic) {
padding_budget_.UseBudget(size.bytes());
} else {
padding_debt_ += size;
padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime);
}
}
void PacingController::SetQueueTimeLimit(TimeDelta limit) {
queue_time_limit = limit;
queue_time_limit_ = limit;
}
} // namespace webrtc

View File

@ -79,6 +79,11 @@ class PacingController {
static const TimeDelta kMinSleepTime;
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the
// nearest millisecond.
static const TimeDelta kMaxEarlyProbeProcessing;
PacingController(Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
@ -158,6 +163,7 @@ class PacingController {
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(TimeDelta delta);
void UpdateBudgetWithSentData(DataSize size);
void UpdatePaddingBudgetWithSentData(DataSize size);
DataSize PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const;
@ -169,7 +175,6 @@ class PacingController {
void OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time);
void OnPaddingSent(DataSize padding_sent);
Timestamp CurrentTime() const;
@ -196,9 +201,9 @@ class PacingController {
mutable Timestamp last_timestamp_;
bool paused_;
// In dynamic mode, `media_budget_` and `padding_budget_` will be used to
// In periodic mode, `media_budget_` and `padding_budget_` will be used to
// track when packets can be sent.
// In periodic mode, `media_debt_` and `padding_debt_` will be used together
// In dynamic mode, `media_debt_` and `padding_debt_` will be used together
// with the target rates.
// This is the media budget, keeping track of how many bits of media
@ -229,7 +234,7 @@ class PacingController {
DataSize congestion_window_size_;
DataSize outstanding_data_;
TimeDelta queue_time_limit;
TimeDelta queue_time_limit_;
bool account_for_audio_;
bool include_overhead_;
};

View File

@ -12,15 +12,14 @@
#include <algorithm>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/trace_event.h"
namespace webrtc {
const int TaskQueuePacedSender::kNoPacketHoldback = -1;
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
PacingController::PacketSender* packet_sender,
@ -41,10 +40,11 @@ TaskQueuePacedSender::TaskQueuePacedSender(
is_started_(false),
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {
packet_size_.Apply(1, 0);
RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
}
TaskQueuePacedSender::~TaskQueuePacedSender() {
@ -140,7 +140,11 @@ void TaskQueuePacedSender::EnqueuePackets(
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
packet_size_.Apply(1, packet->size());
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
@ -159,6 +163,7 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
@ -194,13 +199,16 @@ absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite())
if (oldest_packet.IsInfinite()) {
return TimeDelta::Zero();
}
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet)
if (current < oldest_packet) {
return TimeDelta::Zero();
}
return current - oldest_packet;
}
@ -213,70 +221,70 @@ void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(&task_queue_);
#if RTC_TRACE_EVENTS_ENABLED
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::MaybeProcessPackets");
#endif
if (is_shutdown_ || !is_started_) {
return;
}
// Normally, run ProcessPackets() only if this is the scheduled task.
// If it is not but it is already time to process and there either is
// no scheduled task or the schedule has shifted forward in time, run
// anyway and clear any schedule.
Timestamp next_process_time = pacing_controller_.NextSendTime();
Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
const Timestamp now = clock_->CurrentTime();
const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
if (is_scheduled_call) {
// Indicate no pending scheduled call.
TimeDelta early_execute_margin =
pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
}
UpdateStats();
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
if (scheduled_process_time != next_process_time_) {
return;
}
next_process_time_ = Timestamp::MinusInfinity();
}
if (is_scheduled_call ||
(now >= next_process_time && (next_process_time_.IsInfinite() ||
next_process_time < next_process_time_))) {
pacing_controller_.ProcessPackets();
next_process_time = pacing_controller_.NextSendTime();
}
TimeDelta hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered());
if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() &&
!avg_packet_size.IsZero()) {
TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) {
// If we're probing and there isn't already a wakeup scheduled for the next
// process time, always post a task and just round sleep time down to
// nearest millisecond.
if (next_process_time.IsMinusInfinity()) {
time_to_next_process = TimeDelta::Zero();
} else {
time_to_next_process =
std::max(TimeDelta::Zero(),
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
// Do not hold back in probing.
TimeDelta hold_back_window = TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
!pacing_rate.IsZero() &&
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
TimeDelta avg_packet_send_time =
DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
} else if (next_process_time_.IsMinusInfinity() ||
next_process_time <= next_process_time_ - hold_back_window) {
// Schedule a new task since there is none currently scheduled
// (`next_process_time_` is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled.
time_to_next_process = std::max(next_process_time - now, hold_back_window);
}
if (time_to_next_process) {
// Set a new scheduled process time and post a delayed task.
next_process_time_ = next_process_time;
// Calculate next process time.
TimeDelta time_to_next_process =
std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;
// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
task_queue_.PostDelayedHighPrecisionTask(
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
time_to_next_process->ms<uint32_t>());
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>());
next_process_time_ = next_send_time;
}
UpdateStats();
}
void TaskQueuePacedSender::UpdateStats() {

View File

@ -14,9 +14,7 @@
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <memory>
#include <queue>
#include <vector>
#include "absl/types/optional.h"
@ -29,7 +27,6 @@
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
@ -39,6 +36,8 @@ class RtcEventLog;
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
static const int kNoPacketHoldback;
// The `hold_back_window` parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
@ -51,7 +50,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
TimeDelta max_hold_back_window = PacingController::kMinSleepTime,
int max_hold_back_window_in_packets = -1);
int max_hold_back_window_in_packets = kNoPacketHoldback);
~TaskQueuePacedSender() override;
@ -156,6 +155,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);

View File

@ -37,7 +37,6 @@ constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234;
constexpr int kNoPacketHoldback = -1;
class MockPacketRouter : public PacketRouter {
public:
@ -120,7 +119,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
@ -160,7 +159,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
@ -212,7 +211,7 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -241,11 +240,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -278,11 +277,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -306,7 +305,7 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
@ -314,7 +313,7 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -339,15 +338,16 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
// Advance to less than 3ms before next packet send time.
time_controller.AdvanceTime(TimeDelta::Micros(1001));
// Trigger a probe at 4x the current pacing rate and insert the number of
// Trigger a probe at 2x the current pacing rate and insert the number of
// packets the probe needs.
const DataRate kProbeRate = 2 * kPacingDataRate;
const int kProbeClusterId = 1;
pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
// Expected size for each probe in a cluster is twice the expected bits
// sent during min_probe_delta.
// Expect one additional call since probe always starts with a small
// Expected size for each probe in a cluster is twice the expected bits sent
// during min_probe_delta.
// Expect one additional call since probe always starts with a small (1 byte)
// padding packet that's not counted into the probe rate here.
const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
const size_t kNumPacketsInProbe =
@ -381,7 +381,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -423,8 +423,8 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
// Verify the amount of probing data sent.
// Probe always starts with a small (1 byte) padding packet that's not
// counted into the probe rate here.
EXPECT_EQ(data_sent,
kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
const DataSize kMinProbeSize = 2 * kMinProbeDelta * kProbingRate;
EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
}
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
@ -534,7 +534,7 @@ TEST(TaskQueuePacedSenderTest, Stats) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, kNoPacketHoldback);
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
// Simulate ~2mbps video stream, covering one second.
static constexpr size_t kPacketsToSend = 200;