Revert "Pacer: Reduce TQ wake up and improve packet size estimation"

This reverts commit 37195cf2e577cc09ad1362d046b5c8a9b65d4f99.

Reason for revert: Breaks downstream tests (more investigations and testing is necessary).

Original change's description:
> Pacer: Reduce TQ wake up and improve packet size estimation
>
> The TQ Pacer schedules delayed task according to target time of
> PacingController. It drains all valid ProcessPackets() in single loop,
> denies retired scheduled tasks, and round up the timeout to 1ms.
>
> This CL also improves packet size estimation in TQ Pacer by removing
> zero initialization, and introduces `include_overhead_` configuration.
>
> Tests:
> 1. webrtc_perf_tests: MaybeProcessPackets() calls
>   2075147 -> 2007995
>
> 2. module_unittests: MaybeProcessPackets() calls
>   203393 -> 183563
>
> 3. peerconnection_unittests: MaybeProcessPackets() calls
>   66713-> 64333
>
> Bug: webrtc:13417, webrtc:13437
> Change-Id: I18eb0a36dbe063c606b1f27014df74a65ebfc486
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/242962
> Reviewed-by: Erik Språng <sprang@webrtc.org>
> Reviewed-by: Henrik Boström <hbos@webrtc.org>
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Cr-Commit-Position: refs/heads/main@{#36179}

No-Try: True
Bug: webrtc:13417, webrtc:13437
Change-Id: I5418d26d3978f21765ef38acfb002398e671e036
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/255301
Reviewed-by: Henrik Boström <hbos@webrtc.org>
Owners-Override: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Bot-Commit: rubber-stamper@appspot.gserviceaccount.com <rubber-stamper@appspot.gserviceaccount.com>
Cr-Commit-Position: refs/heads/main@{#36185}
This commit is contained in:
Mirko Bonadei
2022-03-14 09:13:41 +00:00
committed by WebRTC LUCI CQ
parent f2970c8b37
commit d8543dedf2
6 changed files with 260 additions and 232 deletions

View File

@ -79,8 +79,7 @@ RtpTransportControllerSend::PacerSettings::PacerSettings(
const WebRtcKeyValueConfig* trials)
: tq_disabled("Disabled"),
holdback_window("holdback_window", PacingController::kMinSleepTime),
holdback_packets("holdback_packets",
TaskQueuePacedSender::kNoPacketHoldback) {
holdback_packets("holdback_packets", -1) {
ParseFieldTrial({&tq_disabled, &holdback_window, &holdback_packets},
trials->Lookup("WebRTC-TaskQueuePacer"));
}

View File

@ -38,6 +38,11 @@ constexpr TimeDelta kMaxElapsedTime = TimeDelta::Seconds(2);
// time. Applies only to periodic mode.
constexpr TimeDelta kMaxProcessingInterval = TimeDelta::Millis(30);
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the nearest
// millisecond.
constexpr TimeDelta kMaxEarlyProbeProcessing = TimeDelta::Millis(1);
constexpr int kFirstPriority = 0;
bool IsDisabled(const WebRtcKeyValueConfig& field_trials,
@ -89,8 +94,6 @@ const float PacingController::kDefaultPaceMultiplier = 2.5f;
const TimeDelta PacingController::kPausedProcessInterval =
kCongestedPacketInterval;
const TimeDelta PacingController::kMinSleepTime = TimeDelta::Millis(1);
const TimeDelta PacingController::kMaxEarlyProbeProcessing =
TimeDelta::Millis(1);
PacingController::PacingController(Clock* clock,
PacketSender* packet_sender,
@ -130,7 +133,7 @@ PacingController::PacingController(Clock* clock,
packet_counter_(0),
congestion_window_size_(DataSize::PlusInfinity()),
outstanding_data_(DataSize::Zero()),
queue_time_limit_(kMaxExpectedQueueLength),
queue_time_limit(kMaxExpectedQueueLength),
account_for_audio_(false),
include_overhead_(false) {
if (!drain_large_queues_) {
@ -221,7 +224,6 @@ void PacingController::SetPacingRates(DataRate pacing_rate,
media_rate_ = pacing_rate;
padding_rate_ = padding_rate;
pacing_bitrate_ = pacing_rate;
media_budget_.set_target_rate_kbps(pacing_rate.kbps());
padding_budget_.set_target_rate_kbps(padding_rate.kbps());
RTC_LOG(LS_VERBOSE) << "bwe:pacer_updated pacing_kbps="
@ -300,7 +302,10 @@ void PacingController::EnqueuePacketInternal(
// Use that as last process time only if it's prior to now.
target_process_time = std::min(now, next_send_time);
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_process_time);
UpdateBudgetWithElapsedTime(elapsed_time);
last_process_time_ = target_process_time;
}
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
@ -311,6 +316,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
if (last_process_time_.IsMinusInfinity() || now < last_process_time_) {
return TimeDelta::Zero();
}
RTC_DCHECK_GE(now, last_process_time_);
TimeDelta elapsed_time = now - last_process_time_;
last_process_time_ = now;
if (elapsed_time > kMaxElapsedTime) {
@ -327,7 +333,8 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
packet_counter_ == 0) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
if (now - last_send_time_ >= kCongestedPacketInterval) {
TimeDelta elapsed_since_last_send = now - last_send_time_;
if (elapsed_since_last_send >= kCongestedPacketInterval) {
return true;
}
}
@ -336,17 +343,17 @@ bool PacingController::ShouldSendKeepalive(Timestamp now) const {
Timestamp PacingController::NextSendTime() const {
const Timestamp now = CurrentTime();
Timestamp next_send_time = Timestamp::PlusInfinity();
if (paused_) {
return last_send_time_ + kPausedProcessInterval;
}
// If probing is active, that always takes priority.
if (prober_.is_probing() && !probing_send_failure_) {
if (prober_.is_probing()) {
Timestamp probe_time = prober_.NextProbeTime(now);
if (!probe_time.IsPlusInfinity()) {
return probe_time.IsMinusInfinity() ? now : probe_time;
// `probe_time` == PlusInfinity indicates no probe scheduled.
if (probe_time != Timestamp::PlusInfinity() && !probing_send_failure_) {
return probe_time;
}
}
@ -358,53 +365,86 @@ Timestamp PacingController::NextSendTime() const {
// In dynamic mode, figure out when the next packet should be sent,
// given the current conditions.
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> unpaced_audio_time =
pace_audio_ ? absl::nullopt
: packet_queue_.LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time) {
return *unpaced_audio_time;
if (!pace_audio_) {
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> audio_enqueue_time =
packet_queue_.LeadingAudioPacketEnqueueTime();
if (audio_enqueue_time.has_value()) {
return *audio_enqueue_time;
}
}
// We need to at least send keep-alive packets with some interval.
if (Congested() || packet_counter_ == 0) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
// Check how long until we can send the next media packet.
if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
// Check how long until we can send the next media packet.
next_send_time = last_process_time_ + media_debt_ / media_rate_;
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
RTC_DCHECK_GT(media_rate_, DataRate::Zero());
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + media_debt_ / media_rate_);
}
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
TimeDelta drain_time =
std::max(media_debt_ / media_rate_, padding_debt_ / padding_rate_);
next_send_time = last_process_time_ + drain_time;
} else {
// Nothing to do.
next_send_time = last_process_time_ + kPausedProcessInterval;
return std::min(last_send_time_ + kPausedProcessInterval,
last_process_time_ + drain_time);
}
if (send_padding_if_silent_) {
next_send_time =
std::min(next_send_time, last_send_time_ + kPausedProcessInterval);
return last_send_time_ + kPausedProcessInterval;
}
return next_send_time;
return last_process_time_ + kPausedProcessInterval;
}
void PacingController::ProcessPackets() {
Timestamp now = CurrentTime();
Timestamp target_send_time = now;
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
if (target_send_time.IsMinusInfinity()) {
target_send_time = now;
} else if (now < target_send_time - early_execute_margin) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
UpdateBudgetWithElapsedTime(elapsed_time);
return;
}
if (target_send_time < last_process_time_) {
// After the last process call, at time X, the target send time
// shifted to be earlier than X. This should normally not happen
// but we want to make sure rounding errors or erratic behavior
// of NextSendTime() does not cause issue. In particular, if the
// buffer reduction of
// rate * (target_send_time - previous_process_time)
// in the main loop doesn't clean up the existing debt we may not
// be able to send again. We don't want to check this reordering
// there as it is the normal exit condtion when the buffer is
// exhausted and there are packets in the queue.
UpdateBudgetWithElapsedTime(last_process_time_ - target_send_time);
target_send_time = last_process_time_;
}
}
Timestamp previous_process_time = last_process_time_;
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(now);
if (ShouldSendKeepalive(now)) {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ > 0) {
if (packet_counter_ == 0) {
last_send_time_ = now;
} else {
DataSize keepalive_data_sent = DataSize::Zero();
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
@ -415,29 +455,14 @@ void PacingController::ProcessPackets() {
EnqueuePacket(std::move(packet));
}
}
OnPaddingSent(keepalive_data_sent);
}
OnPacketSent(RtpPacketMediaType::kPadding, keepalive_data_sent, now);
}
if (paused_) {
return;
}
if (mode_ == ProcessMode::kDynamic) {
TimeDelta early_execute_margin =
prober_.is_probing() ? kMaxEarlyProbeProcessing : TimeDelta::Zero();
target_send_time = NextSendTime();
if (now + early_execute_margin < target_send_time) {
// We are too early, but if queue is empty still allow draining some debt.
// Probing is allowed to be sent up to kMinSleepTime early.
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(now));
return;
}
}
TimeDelta elapsed_time = UpdateTimeAndGetElapsed(target_send_time);
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
@ -449,7 +474,7 @@ void PacingController::ProcessPackets() {
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_.AverageQueueTime());
queue_time_limit - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
@ -464,12 +489,13 @@ void PacingController::ProcessPackets() {
// up to (process interval duration) * (target rate), so we only need to
// update it once before the packet sending loop.
media_budget_.set_target_rate_kbps(target_rate.kbps());
UpdateBudgetWithElapsedTime(elapsed_time);
} else {
media_rate_ = target_rate;
}
UpdateBudgetWithElapsedTime(elapsed_time);
}
bool first_packet_in_probe = false;
PacedPacketInfo pacing_info;
DataSize recommended_probe_size = DataSize::Zero();
bool is_probing = prober_.is_probing();
@ -478,23 +504,9 @@ void PacingController::ProcessPackets() {
// use actual send time rather than target.
pacing_info = prober_.CurrentCluster(now).value_or(PacedPacketInfo());
if (pacing_info.probe_cluster_id != PacedPacketInfo::kNotAProbe) {
first_packet_in_probe = pacing_info.probe_cluster_bytes_sent == 0;
recommended_probe_size = prober_.RecommendedMinProbeSize();
RTC_DCHECK_GT(recommended_probe_size, DataSize::Zero());
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
if (pacing_info.probe_cluster_bytes_sent == 0) {
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
}
} else {
// No valid probe cluster returned, probe might have timed out.
is_probing = false;
@ -502,74 +514,102 @@ void PacingController::ProcessPackets() {
}
DataSize data_sent = DataSize::Zero();
while (true) {
// Fetch packet, so long as queue is not empty or budget is not
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
if (first_packet_in_probe) {
// If first packet in probe, insert a small padding packet so we have a
// more reliable start window for the rate estimation.
auto padding = packet_sender_->GeneratePadding(DataSize::Bytes(1));
// If no RTP modules sending media are registered, we may not get a
// padding packet back.
if (!padding.empty()) {
// Insert with high priority so larger media packets don't preempt it.
EnqueuePacketInternal(std::move(padding[0]), kFirstPriority);
// We should never get more than one padding packets with a requested
// size of 1 byte.
RTC_DCHECK_EQ(padding.size(), 1u);
}
first_packet_in_probe = false;
}
if (mode_ == ProcessMode::kDynamic &&
previous_process_time < target_send_time) {
// Reduce buffer levels with amount corresponding to time between last
// process and target send time for the next packet.
// If the process call is late, that may be the time between the optimal
// send times for two packets we should already have sent.
UpdateBudgetWithElapsedTime(target_send_time - previous_process_time);
previous_process_time = target_send_time;
}
// Fetch the next packet, so long as queue is not empty or budget is not
// exhausted.
std::unique_ptr<RtpPacketToSend> rtp_packet =
GetPendingPacket(pacing_info, target_send_time, now);
if (rtp_packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
std::vector<std::unique_ptr<RtpPacketToSend>> padding_packets =
packet_sender_->GeneratePadding(padding_to_add);
if (!padding_packets.empty()) {
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
} else {
// Can't generate padding, still update padding budget for next send
// time.
UpdatePaddingBudgetWithSentData(padding_to_add);
if (padding_packets.empty()) {
// No padding packets were generated, quite send loop.
break;
}
for (auto& packet : padding_packets) {
EnqueuePacket(std::move(packet));
}
// Continue loop to send the padding that was just added.
continue;
}
// Can't fetch new packet and no padding to send, exit send loop.
break;
} else {
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
}
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
RTC_DCHECK(rtp_packet);
RTC_DCHECK(rtp_packet->packet_type().has_value());
const RtpPacketMediaType packet_type = *rtp_packet->packet_type();
DataSize packet_size = DataSize::Bytes(rtp_packet->payload_size() +
rtp_packet->padding_size());
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
if (include_overhead_) {
packet_size += DataSize::Bytes(rtp_packet->headers_size()) +
transport_overhead_per_packet_;
}
// Send done, update send time.
OnPacketSent(packet_type, packet_size, now);
packet_sender_->SendPacket(std::move(rtp_packet), pacing_info);
for (auto& packet : packet_sender_->FetchFec()) {
EnqueuePacket(std::move(packet));
}
data_sent += packet_size;
// If we are currently probing, we need to stop the send loop when we
// have reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
// Send done, update send/process time to the target send time.
OnPacketSent(packet_type, packet_size, target_send_time);
// If we are currently probing, we need to stop the send loop when we have
// reached the send target.
if (is_probing && data_sent >= recommended_probe_size) {
break;
}
if (mode_ == ProcessMode::kDynamic) {
// Update target send time in case that are more packets that we are late
// in processing.
if (mode_ == ProcessMode::kDynamic) {
target_send_time = NextSendTime();
if (target_send_time > now) {
// Exit loop if not probing.
if (!is_probing) {
break;
}
target_send_time = now;
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_send_time));
Timestamp next_send_time = NextSendTime();
if (next_send_time.IsMinusInfinity()) {
target_send_time = now;
} else {
target_send_time = std::min(now, next_send_time);
}
}
}
last_process_time_ = std::max(last_process_time_, previous_process_time);
if (is_probing) {
probing_send_failure_ = data_sent == DataSize::Zero();
if (!probing_send_failure_) {
@ -591,8 +631,8 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
}
if (packet_counter_ == 0) {
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
return DataSize::Zero();
}
@ -657,16 +697,25 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time) {
if (!first_sent_packet_time_ && packet_type != RtpPacketMediaType::kPadding) {
if (!first_sent_packet_time_) {
first_sent_packet_time_ = send_time;
}
bool audio_packet = packet_type == RtpPacketMediaType::kAudio;
if ((!audio_packet || account_for_audio_) && packet_size > DataSize::Zero()) {
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
UpdateBudgetWithSentData(packet_size);
}
last_send_time_ = send_time;
last_process_time_ = send_time;
}
void PacingController::OnPaddingSent(DataSize data_sent) {
if (data_sent > DataSize::Zero()) {
UpdateBudgetWithSentData(data_sent);
}
Timestamp now = CurrentTime();
last_send_time_ = now;
last_process_time_ = now;
}
void PacingController::UpdateBudgetWithElapsedTime(TimeDelta delta) {
@ -684,24 +733,17 @@ void PacingController::UpdateBudgetWithSentData(DataSize size) {
outstanding_data_ += size;
if (mode_ == ProcessMode::kPeriodic) {
media_budget_.UseBudget(size.bytes());
padding_budget_.UseBudget(size.bytes());
} else {
media_debt_ += size;
media_debt_ = std::min(media_debt_, media_rate_ * kMaxDebtInTime);
}
UpdatePaddingBudgetWithSentData(size);
}
void PacingController::UpdatePaddingBudgetWithSentData(DataSize size) {
if (mode_ == ProcessMode::kPeriodic) {
padding_budget_.UseBudget(size.bytes());
} else {
padding_debt_ += size;
padding_debt_ = std::min(padding_debt_, padding_rate_ * kMaxDebtInTime);
}
}
void PacingController::SetQueueTimeLimit(TimeDelta limit) {
queue_time_limit_ = limit;
queue_time_limit = limit;
}
} // namespace webrtc

View File

@ -79,11 +79,6 @@ class PacingController {
static const TimeDelta kMinSleepTime;
// Allow probes to be processed slightly ahead of inteded send time. Currently
// set to 1ms as this is intended to allow times be rounded down to the
// nearest millisecond.
static const TimeDelta kMaxEarlyProbeProcessing;
PacingController(Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
@ -163,7 +158,6 @@ class PacingController {
// Updates the number of bytes that can be sent for the next time interval.
void UpdateBudgetWithElapsedTime(TimeDelta delta);
void UpdateBudgetWithSentData(DataSize size);
void UpdatePaddingBudgetWithSentData(DataSize size);
DataSize PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const;
@ -175,6 +169,7 @@ class PacingController {
void OnPacketSent(RtpPacketMediaType packet_type,
DataSize packet_size,
Timestamp send_time);
void OnPaddingSent(DataSize padding_sent);
Timestamp CurrentTime() const;
@ -201,9 +196,9 @@ class PacingController {
mutable Timestamp last_timestamp_;
bool paused_;
// In periodic mode, `media_budget_` and `padding_budget_` will be used to
// In dynamic mode, `media_budget_` and `padding_budget_` will be used to
// track when packets can be sent.
// In dynamic mode, `media_debt_` and `padding_debt_` will be used together
// In periodic mode, `media_debt_` and `padding_debt_` will be used together
// with the target rates.
// This is the media budget, keeping track of how many bits of media
@ -234,7 +229,7 @@ class PacingController {
DataSize congestion_window_size_;
DataSize outstanding_data_;
TimeDelta queue_time_limit_;
TimeDelta queue_time_limit;
bool account_for_audio_;
bool include_overhead_;
};

View File

@ -12,14 +12,15 @@
#include <algorithm>
#include <utility>
#include "absl/memory/memory.h"
#include "rtc_base/checks.h"
#include "rtc_base/event.h"
#include "rtc_base/logging.h"
#include "rtc_base/task_utils/to_queued_task.h"
#include "rtc_base/trace_event.h"
namespace webrtc {
const int TaskQueuePacedSender::kNoPacketHoldback = -1;
TaskQueuePacedSender::TaskQueuePacedSender(
Clock* clock,
PacingController::PacketSender* packet_sender,
@ -40,11 +41,10 @@ TaskQueuePacedSender::TaskQueuePacedSender(
is_started_(false),
is_shutdown_(false),
packet_size_(/*alpha=*/0.95),
include_overhead_(false),
task_queue_(task_queue_factory->CreateTaskQueue(
"TaskQueuePacedSender",
TaskQueueFactory::Priority::NORMAL)) {
RTC_DCHECK_GE(max_hold_back_window_, PacingController::kMinSleepTime);
packet_size_.Apply(1, 0);
}
TaskQueuePacedSender::~TaskQueuePacedSender() {
@ -140,11 +140,7 @@ void TaskQueuePacedSender::EnqueuePackets(
task_queue_.PostTask([this, packets_ = std::move(packets)]() mutable {
RTC_DCHECK_RUN_ON(&task_queue_);
for (auto& packet : packets_) {
size_t packet_size = packet->payload_size() + packet->padding_size();
if (include_overhead_) {
packet_size += packet->headers_size();
}
packet_size_.Apply(1, packet_size);
packet_size_.Apply(1, packet->size());
RTC_DCHECK_GE(packet->capture_time(), Timestamp::Zero());
pacing_controller_.EnqueuePacket(std::move(packet));
}
@ -163,7 +159,6 @@ void TaskQueuePacedSender::SetAccountForAudioPackets(bool account_for_audio) {
void TaskQueuePacedSender::SetIncludeOverhead() {
task_queue_.PostTask([this]() {
RTC_DCHECK_RUN_ON(&task_queue_);
include_overhead_ = true;
pacing_controller_.SetIncludeOverhead();
MaybeProcessPackets(Timestamp::MinusInfinity());
});
@ -199,16 +194,13 @@ absl::optional<Timestamp> TaskQueuePacedSender::FirstSentPacketTime() const {
TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
Timestamp oldest_packet = GetStats().oldest_packet_enqueue_time;
if (oldest_packet.IsInfinite()) {
if (oldest_packet.IsInfinite())
return TimeDelta::Zero();
}
// (webrtc:9716): The clock is not always monotonic.
Timestamp current = clock_->CurrentTime();
if (current < oldest_packet) {
if (current < oldest_packet)
return TimeDelta::Zero();
}
return current - oldest_packet;
}
@ -221,70 +213,70 @@ void TaskQueuePacedSender::MaybeProcessPackets(
Timestamp scheduled_process_time) {
RTC_DCHECK_RUN_ON(&task_queue_);
#if RTC_TRACE_EVENTS_ENABLED
TRACE_EVENT0(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"TaskQueuePacedSender::MaybeProcessPackets");
#endif
if (is_shutdown_ || !is_started_) {
return;
}
Timestamp next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
// Normally, run ProcessPackets() only if this is the scheduled task.
// If it is not but it is already time to process and there either is
// no scheduled task or the schedule has shifted forward in time, run
// anyway and clear any schedule.
Timestamp next_process_time = pacing_controller_.NextSendTime();
const Timestamp now = clock_->CurrentTime();
TimeDelta early_execute_margin =
pacing_controller_.IsProbing()
? PacingController::kMaxEarlyProbeProcessing
: TimeDelta::Zero();
// Process packets and update stats.
while (next_send_time <= now + early_execute_margin) {
pacing_controller_.ProcessPackets();
next_send_time = pacing_controller_.NextSendTime();
RTC_DCHECK(next_send_time.IsFinite());
}
UpdateStats();
// Ignore retired scheduled task, otherwise reset `next_process_time_`.
if (scheduled_process_time.IsFinite()) {
if (scheduled_process_time != next_process_time_) {
return;
}
const bool is_scheduled_call = next_process_time_ == scheduled_process_time;
if (is_scheduled_call) {
// Indicate no pending scheduled call.
next_process_time_ = Timestamp::MinusInfinity();
}
if (is_scheduled_call ||
(now >= next_process_time && (next_process_time_.IsInfinite() ||
next_process_time < next_process_time_))) {
pacing_controller_.ProcessPackets();
next_process_time = pacing_controller_.NextSendTime();
}
// Do not hold back in probing.
TimeDelta hold_back_window = TimeDelta::Zero();
if (!pacing_controller_.IsProbing()) {
hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
if (max_hold_back_window_in_packets_ != kNoPacketHoldback &&
!pacing_rate.IsZero() &&
packet_size_.filtered() != rtc::ExpFilter::kValueUndefined) {
TimeDelta avg_packet_send_time =
DataSize::Bytes(packet_size_.filtered()) / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
TimeDelta hold_back_window = max_hold_back_window_;
DataRate pacing_rate = pacing_controller_.pacing_rate();
DataSize avg_packet_size = DataSize::Bytes(packet_size_.filtered());
if (max_hold_back_window_in_packets_ > 0 && !pacing_rate.IsZero() &&
!avg_packet_size.IsZero()) {
TimeDelta avg_packet_send_time = avg_packet_size / pacing_rate;
hold_back_window =
std::min(hold_back_window,
avg_packet_send_time * max_hold_back_window_in_packets_);
}
absl::optional<TimeDelta> time_to_next_process;
if (pacing_controller_.IsProbing() &&
next_process_time != next_process_time_) {
// If we're probing and there isn't already a wakeup scheduled for the next
// process time, always post a task and just round sleep time down to
// nearest millisecond.
if (next_process_time.IsMinusInfinity()) {
time_to_next_process = TimeDelta::Zero();
} else {
time_to_next_process =
std::max(TimeDelta::Zero(),
(next_process_time - now).RoundDownTo(TimeDelta::Millis(1)));
}
} else if (next_process_time_.IsMinusInfinity() ||
next_process_time <= next_process_time_ - hold_back_window) {
// Schedule a new task since there is none currently scheduled
// (`next_process_time_` is infinite), or the new process time is at least
// one holdback window earlier than whatever is currently scheduled.
time_to_next_process = std::max(next_process_time - now, hold_back_window);
}
// Calculate next process time.
TimeDelta time_to_next_process =
std::max(hold_back_window, next_send_time - now - early_execute_margin);
next_send_time = now + time_to_next_process;
if (time_to_next_process) {
// Set a new scheduled process time and post a delayed task.
next_process_time_ = next_process_time;
// If no in flight task or in flight task is later than `next_send_time`,
// schedule a new one. Previous in flight task will be retired.
if (next_process_time_.IsMinusInfinity() ||
next_process_time_ > next_send_time) {
task_queue_.PostDelayedHighPrecisionTask(
[this, next_send_time]() { MaybeProcessPackets(next_send_time); },
time_to_next_process.RoundUpTo(TimeDelta::Millis(1)).ms<uint32_t>());
next_process_time_ = next_send_time;
[this, next_process_time]() { MaybeProcessPackets(next_process_time); },
time_to_next_process->ms<uint32_t>());
}
UpdateStats();
}
void TaskQueuePacedSender::UpdateStats() {

View File

@ -14,7 +14,9 @@
#include <stddef.h>
#include <stdint.h>
#include <functional>
#include <memory>
#include <queue>
#include <vector>
#include "absl/types/optional.h"
@ -27,6 +29,7 @@
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/numerics/exp_filter.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h"
@ -36,8 +39,6 @@ class RtcEventLog;
class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
public:
static const int kNoPacketHoldback;
// The `hold_back_window` parameter sets a lower bound on time to sleep if
// there is currently a pacer queue and packets can't immediately be
// processed. Increasing this reduces thread wakeups at the expense of higher
@ -50,7 +51,7 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
const WebRtcKeyValueConfig* field_trials,
TaskQueueFactory* task_queue_factory,
TimeDelta max_hold_back_window = PacingController::kMinSleepTime,
int max_hold_back_window_in_packets = kNoPacketHoldback);
int max_hold_back_window_in_packets = -1);
~TaskQueuePacedSender() override;
@ -155,7 +156,6 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// Filtered size of enqueued packets, in bytes.
rtc::ExpFilter packet_size_ RTC_GUARDED_BY(task_queue_);
bool include_overhead_ RTC_GUARDED_BY(task_queue_);
mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);

View File

@ -37,6 +37,7 @@ constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
constexpr size_t kDefaultPacketSize = 1234;
constexpr int kNoPacketHoldback = -1;
class MockPacketRouter : public PacketRouter {
public:
@ -119,7 +120,7 @@ TEST(TaskQueuePacedSenderTest, PacesPackets) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
// Insert a number of packets, covering one second.
static constexpr size_t kPacketsToSend = 42;
@ -159,7 +160,7 @@ TEST(TaskQueuePacedSenderTest, ReschedulesProcessOnRateChange) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
// Insert a number of packets to be sent 200ms apart.
const size_t kPacketsPerSecond = 5;
@ -211,7 +212,7 @@ TEST(TaskQueuePacedSenderTest, SendsAudioImmediately) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
const DataRate kPacingDataRate = DataRate::KilobitsPerSec(125);
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -240,11 +241,11 @@ TEST(TaskQueuePacedSenderTest, SleepsDuringCoalscingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -277,11 +278,11 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
const TimeDelta kCoalescingWindow = TimeDelta::Millis(5);
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
TaskQueuePacedSender pacer(
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
kCoalescingWindow, TaskQueuePacedSender::kNoPacketHoldback);
TaskQueuePacedSender pacer(time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr,
time_controller.GetTaskQueueFactory(),
kCoalescingWindow, kNoPacketHoldback);
// Set rates so one packet adds one ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -305,7 +306,7 @@ TEST(TaskQueuePacedSenderTest, ProbingOverridesCoalescingWindow) {
time_controller.AdvanceTime(kCoalescingWindow - TimeDelta::Millis(1));
}
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSetTime) {
ScopedFieldTrials trials("WebRTC-Bwe-ProbingBehavior/min_probe_delta:1ms/");
GlobalSimulatedTimeController time_controller(Timestamp::Millis(1234));
MockPacketRouter packet_router;
@ -313,7 +314,7 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -338,16 +339,15 @@ TEST(TaskQueuePacedSenderTest, SchedulesProbeAtSentTime) {
// Advance to less than 3ms before next packet send time.
time_controller.AdvanceTime(TimeDelta::Micros(1001));
// Trigger a probe at 2x the current pacing rate and insert the number of
// Trigger a probe at 4x the current pacing rate and insert the number of
// packets the probe needs.
const DataRate kProbeRate = 2 * kPacingDataRate;
const int kProbeClusterId = 1;
pacer.CreateProbeCluster(kProbeRate, kProbeClusterId);
// Expected size for each probe in a cluster is twice the expected bits sent
// during min_probe_delta.
// Expect one additional call since probe always starts with a small (1 byte)
// padding packet that's not counted into the probe rate here.
// Expected size for each probe in a cluster is twice the expected bits
// sent during min_probe_delta.
// Expect one additional call since probe always starts with a small
const TimeDelta kProbeTimeDelta = TimeDelta::Millis(2);
const DataSize kProbeSize = kProbeRate * kProbeTimeDelta;
const size_t kNumPacketsInProbe =
@ -381,7 +381,7 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
// Set rates so one packet adds 4ms of buffer level.
const DataSize kPacketSize = DataSize::Bytes(kDefaultPacketSize);
@ -423,8 +423,8 @@ TEST(TaskQueuePacedSenderTest, NoMinSleepTimeWhenProbing) {
// Verify the amount of probing data sent.
// Probe always starts with a small (1 byte) padding packet that's not
// counted into the probe rate here.
const DataSize kMinProbeSize = 2 * kMinProbeDelta * kProbingRate;
EXPECT_EQ(data_sent, DataSize::Bytes(1) + kPacketSize + 4 * kMinProbeSize);
EXPECT_EQ(data_sent,
kProbingRate * TimeDelta::Millis(1) + DataSize::Bytes(1));
}
TEST(TaskQueuePacedSenderTest, PacketBasedCoalescing) {
@ -534,7 +534,7 @@ TEST(TaskQueuePacedSenderTest, Stats) {
time_controller.GetClock(), &packet_router,
/*event_log=*/nullptr,
/*field_trials=*/nullptr, time_controller.GetTaskQueueFactory(),
PacingController::kMinSleepTime, TaskQueuePacedSender::kNoPacketHoldback);
PacingController::kMinSleepTime, kNoPacketHoldback);
// Simulate ~2mbps video stream, covering one second.
static constexpr size_t kPacketsToSend = 200;