Abstract the pacer queue behind an interface.

This will make it easier to extend testing, implement new features (e.g.
packet culling) and experiment with new variants.

Bug: webrtc:11340
Change-Id: I747f5f6cff61e11a420e43b06ffe0c4aba438c7b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/260116
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#36670}
This commit is contained in:
Erik Språng
2022-04-27 12:33:02 +02:00
committed by WebRTC LUCI CQ
parent a8ad11de82
commit cb56827c55
4 changed files with 112 additions and 110 deletions

View File

@ -18,6 +18,7 @@
#include "absl/strings/match.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/logging.h"
@ -96,8 +97,9 @@ PacingController::PacingController(Clock* clock,
pacing_bitrate_(DataRate::Zero()),
last_process_time_(clock->CurrentTime()),
last_send_time_(last_process_time_),
packet_queue_(last_process_time_),
packet_counter_(0),
seen_first_packet_(false),
packet_queue_(
std::make_unique<RoundRobinPacketQueue>(last_process_time_)),
congested_(false),
queue_time_limit_(kMaxExpectedQueueLength),
account_for_audio_(false),
@ -123,14 +125,14 @@ void PacingController::Pause() {
if (!paused_)
RTC_LOG(LS_INFO) << "PacedSender paused.";
paused_ = true;
packet_queue_.SetPauseState(true, CurrentTime());
packet_queue_->SetPauseState(true, CurrentTime());
}
void PacingController::Resume() {
if (paused_)
RTC_LOG(LS_INFO) << "PacedSender resumed.";
paused_ = false;
packet_queue_.SetPauseState(false, CurrentTime());
packet_queue_->SetPauseState(false, CurrentTime());
}
bool PacingController::IsPaused() const {
@ -162,7 +164,7 @@ Timestamp PacingController::CurrentTime() const {
}
void PacingController::SetProbingEnabled(bool enabled) {
RTC_CHECK_EQ(0, packet_counter_);
RTC_CHECK(!seen_first_packet_);
prober_.SetEnabled(enabled);
}
@ -196,7 +198,7 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
Timestamp now = CurrentTime();
if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty()) {
if (mode_ == ProcessMode::kDynamic && packet_queue_->Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
// packet.
@ -209,7 +211,8 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
packet_queue_.Push(now, packet_counter_++, std::move(packet));
packet_queue_->Push(now, std::move(packet));
seen_first_packet_ = true;
}
void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
@ -218,14 +221,12 @@ void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
void PacingController::SetIncludeOverhead() {
include_overhead_ = true;
packet_queue_.SetIncludeOverhead();
}
void PacingController::SetTransportOverhead(DataSize overhead_per_packet) {
if (ignore_transport_overhead_)
return;
transport_overhead_per_packet_ = overhead_per_packet;
packet_queue_.SetTransportOverhead(overhead_per_packet);
}
TimeDelta PacingController::ExpectedQueueTime() const {
@ -236,11 +237,16 @@ TimeDelta PacingController::ExpectedQueueTime() const {
}
size_t PacingController::QueueSizePackets() const {
return packet_queue_.SizeInPackets();
return packet_queue_->SizeInPackets();
}
DataSize PacingController::QueueSizeData() const {
return packet_queue_.Size();
DataSize size = packet_queue_->SizeInPayloadBytes();
if (include_overhead_) {
size += static_cast<int64_t>(packet_queue_->SizeInPackets()) *
transport_overhead_per_packet_;
}
return size;
}
DataSize PacingController::CurrentBufferLevel() const {
@ -252,7 +258,7 @@ absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
}
Timestamp PacingController::OldestPacketEnqueueTime() const {
return packet_queue_.OldestEnqueueTime();
return packet_queue_->OldestEnqueueTime();
}
TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
@ -273,8 +279,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
}
bool PacingController::ShouldSendKeepalive(Timestamp now) const {
if (send_padding_if_silent_ || paused_ || congested_ ||
packet_counter_ == 0) {
if (send_padding_if_silent_ || paused_ || congested_ || !seen_first_packet_) {
// We send a padding packet every 500 ms to ensure we won't get stuck in
// congested state due to no feedback being received.
if (now - last_send_time_ >= kCongestedPacketInterval) {
@ -310,22 +315,22 @@ Timestamp PacingController::NextSendTime() const {
// Not pacing audio, if leading packet is audio its target send
// time is the time at which it was enqueued.
absl::optional<Timestamp> unpaced_audio_time =
pace_audio_ ? absl::nullopt
: packet_queue_.LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time) {
return *unpaced_audio_time;
Timestamp unpaced_audio_time =
pace_audio_ ? Timestamp::PlusInfinity()
: packet_queue_->LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time.IsFinite()) {
return unpaced_audio_time;
}
if (congested_ || packet_counter_ == 0) {
if (congested_ || !seen_first_packet_) {
// We need to at least send keep-alive packets with some interval.
return last_send_time_ + kCongestedPacketInterval;
}
if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
// Check how long until we can send the next media packet.
next_send_time = last_process_time_ + media_debt_ / media_rate_;
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
} else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
@ -361,7 +366,7 @@ void PacingController::ProcessPackets() {
DataSize keepalive_data_sent = DataSize::Zero();
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
if (packet_counter_ > 0) {
if (seen_first_packet_) {
std::vector<std::unique_ptr<RtpPacketToSend>> keepalive_packets =
packet_sender_->GeneratePadding(DataSize::Bytes(1));
for (auto& packet : keepalive_packets) {
@ -397,16 +402,16 @@ void PacingController::ProcessPackets() {
if (elapsed_time > TimeDelta::Zero()) {
DataRate target_rate = pacing_bitrate_;
DataSize queue_size_data = packet_queue_.Size();
DataSize queue_size_data = QueueSizeData();
if (queue_size_data > DataSize::Zero()) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateQueueTime(now);
packet_queue_->UpdateAverageQueueTime(now);
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_.AverageQueueTime());
queue_time_limit_ - packet_queue_->AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > target_rate) {
target_rate = min_rate_needed;
@ -545,7 +550,7 @@ void PacingController::ProcessPackets() {
DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const {
if (!packet_queue_.Empty()) {
if (!packet_queue_->Empty()) {
// Actual payload available, no need to add padding.
return DataSize::Zero();
}
@ -555,7 +560,7 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
return DataSize::Zero();
}
if (packet_counter_ == 0) {
if (!seen_first_packet_) {
// We can not send padding unless a normal packet has first been sent. If
// we do, timestamps get messed up.
return DataSize::Zero();
@ -597,7 +602,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
}
}
if (packet_queue_.Empty()) {
if (packet_queue_->Empty()) {
return nullptr;
}
@ -605,7 +610,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
// Unpaced audio packets and probes are exempted from send checks.
bool unpaced_audio_packet =
!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value();
!pace_audio_ && packet_queue_->LeadingAudioPacketEnqueueTime().IsFinite();
if (!unpaced_audio_packet && !is_probe) {
if (congested_) {
// Don't send anything if congested.
@ -631,7 +636,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
}
}
return packet_queue_.Pop();
return packet_queue_->Pop();
}
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,

View File

@ -25,7 +25,6 @@
#include "api/transport/network_types.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@ -60,6 +59,46 @@ class PacingController {
DataSize size) = 0;
};
// Interface for class hanlding storage of and prioritization of packets
// pending to be sent by the pacer.
// Note that for the methods taking a Timestamp as parameter, the parameter
// will never decrease between two subsequent calls.
class PacketQueue {
public:
virtual ~PacketQueue() = default;
virtual void Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) = 0;
virtual std::unique_ptr<RtpPacketToSend> Pop() = 0;
virtual size_t SizeInPackets() const = 0;
bool Empty() const { return SizeInPackets() == 0; }
virtual DataSize SizeInPayloadBytes() const = 0;
// If the next packet, that would be returned by Pop() if called
// now, is an audio packet this method returns the enqueue time
// of that packet. If queue is empty or top packet is not audio,
// returns Timestamp::MinusInfinity().
virtual Timestamp LeadingAudioPacketEnqueueTime() const = 0;
// Enqueue time of the oldest packet in the queue,
// Timestamp::MinusInfinity() if queue is empty.
virtual Timestamp OldestEnqueueTime() const = 0;
// Average queue time for the packets currently in the queue.
// The queuing time is calculated from Push() to the last UpdateQueueTime()
// call - with any time spent in a paused state subtracted.
virtual TimeDelta AverageQueueTime() const = 0;
// Called during packet processing or when pause stats changes. Since the
// AverageQueueTime() method does not look at the wall time, this method
// needs to be called before querying queue time.
virtual void UpdateAverageQueueTime(Timestamp now) = 0;
// Set the pause state, while `paused` is true queuing time is not counted.
virtual void SetPauseState(bool paused, Timestamp now) = 0;
};
// Expected max pacer delay. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
@ -219,9 +258,9 @@ class PacingController {
Timestamp last_process_time_;
Timestamp last_send_time_;
absl::optional<Timestamp> first_sent_packet_time_;
bool seen_first_packet_;
RoundRobinPacketQueue packet_queue_;
uint64_t packet_counter_;
std::unique_ptr<PacketQueue> packet_queue_;
bool congested_;

View File

@ -53,7 +53,7 @@ RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
int64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::unique_ptr<RtpPacketToSend> packet)
: priority_(priority),
@ -94,7 +94,7 @@ bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const {
return Type() == RtpPacketMediaType::kRetransmission;
}
uint64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
int64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
return enqueue_order_;
}
@ -134,6 +134,7 @@ RoundRobinPacketQueue::Stream::~Stream() = default;
RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time)
: transport_overhead_per_packet_(DataSize::Zero()),
time_last_updated_(start_time),
enqueue_count_(0),
paused_(false),
size_packets_(0),
size_(DataSize::Zero()),
@ -144,28 +145,27 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time)
RoundRobinPacketQueue::~RoundRobinPacketQueue() {
// Make sure to release any packets owned by raw pointer in QueuedPacket.
while (!Empty()) {
while (size_packets_ > 0) {
Pop();
}
}
void RoundRobinPacketQueue::Push(Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
RTC_DCHECK(packet->packet_type().has_value());
int priority = GetPriorityForType(*packet->packet_type());
if (size_packets_ == 0) {
// Single packet fast-path.
single_packet_queue_.emplace(
QueuedPacket(priority, enqueue_time, enqueue_order,
QueuedPacket(priority, enqueue_time, enqueue_count_++,
enqueue_times_.end(), std::move(packet)));
UpdateQueueTime(enqueue_time);
UpdateAverageQueueTime(enqueue_time);
single_packet_queue_->SubtractPauseTime(pause_time_sum_);
size_packets_ = 1;
size_ += PacketSize(*single_packet_queue_);
} else {
MaybePromoteSinglePacketToNormalQueue();
Push(QueuedPacket(priority, enqueue_time, enqueue_order,
Push(QueuedPacket(priority, enqueue_time, enqueue_count_++,
enqueue_times_.insert(enqueue_time), std::move(packet)));
}
}
@ -182,7 +182,7 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
return rtp_packet;
}
RTC_DCHECK(!Empty());
RTC_DCHECK_GT(size_packets_, 0u);
Stream* stream = GetHighestPriorityStream();
const QueuedPacket& queued_packet = stream->packet_queue.top();
@ -231,34 +231,24 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
return rtp_packet;
}
bool RoundRobinPacketQueue::Empty() const {
if (size_packets_ == 0) {
RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty());
return true;
}
RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty());
return false;
}
size_t RoundRobinPacketQueue::SizeInPackets() const {
return size_packets_;
}
DataSize RoundRobinPacketQueue::Size() const {
DataSize RoundRobinPacketQueue::SizeInPayloadBytes() const {
return size_;
}
absl::optional<Timestamp> RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime()
const {
Timestamp RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() const {
if (single_packet_queue_.has_value()) {
if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) {
return single_packet_queue_->EnqueueTime();
}
return absl::nullopt;
return Timestamp::MinusInfinity();
}
if (stream_priorities_.empty()) {
return absl::nullopt;
return Timestamp::MinusInfinity();
}
uint32_t ssrc = stream_priorities_.begin()->second;
@ -266,7 +256,7 @@ absl::optional<Timestamp> RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime()
if (top_packet.Type() == RtpPacketMediaType::kAudio) {
return top_packet.EnqueueTime();
}
return absl::nullopt;
return Timestamp::MinusInfinity();
}
Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
@ -274,13 +264,13 @@ Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
return single_packet_queue_->EnqueueTime();
}
if (Empty())
if (size_packets_ == 0)
return Timestamp::MinusInfinity();
RTC_CHECK(!enqueue_times_.empty());
return *enqueue_times_.begin();
}
void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
void RoundRobinPacketQueue::UpdateAverageQueueTime(Timestamp now) {
RTC_CHECK_GE(now, time_last_updated_);
if (now == time_last_updated_)
return;
@ -299,38 +289,12 @@ void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) {
void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
if (paused_ == paused)
return;
UpdateQueueTime(now);
UpdateAverageQueueTime(now);
paused_ = paused;
}
void RoundRobinPacketQueue::SetIncludeOverhead() {
MaybePromoteSinglePacketToNormalQueue();
include_overhead_ = true;
// We need to update the size to reflect overhead for existing packets.
for (const auto& stream : streams_) {
for (const QueuedPacket& packet : stream.second.packet_queue) {
size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
transport_overhead_per_packet_;
}
}
}
void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) {
MaybePromoteSinglePacketToNormalQueue();
if (include_overhead_) {
DataSize previous_overhead = transport_overhead_per_packet_;
// We need to update the size to reflect overhead for existing packets.
for (const auto& stream : streams_) {
int packets = stream.second.packet_queue.size();
size_ -= packets * previous_overhead;
size_ += packets * overhead_per_packet;
}
}
transport_overhead_per_packet_ = overhead_per_packet;
}
TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
if (Empty())
if (size_packets_ == 0)
return TimeDelta::Zero();
return queue_time_sum_ / size_packets_;
}
@ -371,7 +335,7 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) {
// the total amount of time the queue has been paused at that moment. This
// way we subtract the total amount of time the packet has spent in the
// queue while in a paused state.
UpdateQueueTime(packet.EnqueueTime());
UpdateAverageQueueTime(packet.EnqueueTime());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;

View File

@ -25,44 +25,36 @@
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "system_wrappers/include/clock.h"
namespace webrtc {
class RoundRobinPacketQueue {
class RoundRobinPacketQueue : public PacingController::PacketQueue {
public:
explicit RoundRobinPacketQueue(Timestamp start_time);
~RoundRobinPacketQueue();
void Push(Timestamp enqueue_time,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet);
std::unique_ptr<RtpPacketToSend> Pop();
std::unique_ptr<RtpPacketToSend> packet) override;
std::unique_ptr<RtpPacketToSend> Pop() override;
bool Empty() const;
size_t SizeInPackets() const;
DataSize Size() const;
// If the next packet, that would be returned by Pop() if called
// now, is an audio packet this method returns the enqueue time
// of that packet. If queue is empty or top packet is not audio,
// returns nullopt.
absl::optional<Timestamp> LeadingAudioPacketEnqueueTime() const;
Timestamp OldestEnqueueTime() const;
TimeDelta AverageQueueTime() const;
void UpdateQueueTime(Timestamp now);
void SetPauseState(bool paused, Timestamp now);
void SetIncludeOverhead();
void SetTransportOverhead(DataSize overhead_per_packet);
size_t SizeInPackets() const override;
DataSize SizeInPayloadBytes() const override;
Timestamp LeadingAudioPacketEnqueueTime() const override;
Timestamp OldestEnqueueTime() const override;
TimeDelta AverageQueueTime() const override;
void UpdateAverageQueueTime(Timestamp now) override;
void SetPauseState(bool paused, Timestamp now) override;
private:
struct QueuedPacket {
public:
QueuedPacket(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
int64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::unique_ptr<RtpPacketToSend> packet);
QueuedPacket(const QueuedPacket& rhs);
@ -75,7 +67,7 @@ class RoundRobinPacketQueue {
uint32_t Ssrc() const;
Timestamp EnqueueTime() const;
bool IsRetransmission() const;
uint64_t EnqueueOrder() const;
int64_t EnqueueOrder() const;
RtpPacketToSend* RtpPacket() const;
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const;
@ -85,7 +77,7 @@ class RoundRobinPacketQueue {
private:
int priority_;
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
uint64_t enqueue_order_;
int64_t enqueue_order_;
bool is_retransmission_; // Cached for performance.
std::multiset<Timestamp>::iterator enqueue_time_it_;
// Raw pointer since priority_queue doesn't allow for moving
@ -147,6 +139,8 @@ class RoundRobinPacketQueue {
Timestamp time_last_updated_;
int64_t enqueue_count_;
bool paused_;
size_t size_packets_;
DataSize size_;