Remove unused class RoundRobinPacketQueue.

This removes the field trial WebRTC-Pacer-UsePrioritizedPacketQueue.

Bug: webrtc:11340
Change-Id: I9a7ee64ff5ae3ad1fee6ed5d552ec681e3b4b534
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272240
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37852}
This commit is contained in:
Erik Språng
2022-08-18 13:48:56 +02:00
committed by WebRTC LUCI CQ
parent 8f809860aa
commit a9627e770e
8 changed files with 72 additions and 766 deletions

View File

@ -23,8 +23,6 @@ rtc_library("pacing") {
"packet_router.h",
"prioritized_packet_queue.cc",
"prioritized_packet_queue.h",
"round_robin_packet_queue.cc",
"round_robin_packet_queue.h",
"rtp_packet_pacer.h",
"task_queue_paced_sender.cc",
"task_queue_paced_sender.h",
@ -91,7 +89,6 @@ if (rtc_include_tests) {
"pacing_controller_unittest.cc",
"packet_router_unittest.cc",
"prioritized_packet_queue_unittest.cc",
"round_robin_packet_queue_unittest.cc",
"task_queue_paced_sender_unittest.cc",
]
deps = [

View File

@ -18,8 +18,6 @@
#include "absl/strings/match.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/prioritized_packet_queue.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "rtc_base/checks.h"
#include "rtc_base/experiments/field_trial_parser.h"
#include "rtc_base/logging.h"
@ -45,15 +43,6 @@ bool IsEnabled(const FieldTrialsView& field_trials, absl::string_view key) {
return absl::StartsWith(field_trials.Lookup(key), "Enabled");
}
std::unique_ptr<PacingController::PacketQueue> CreatePacketQueue(
const FieldTrialsView& field_trials,
Timestamp creation_time) {
if (field_trials.IsDisabled("WebRTC-Pacer-UsePrioritizedPacketQueue")) {
return std::make_unique<RoundRobinPacketQueue>(creation_time);
}
return std::make_unique<PrioritizedPacketQueue>(creation_time);
}
} // namespace
const TimeDelta PacingController::kMaxExpectedQueueLength =
@ -93,7 +82,7 @@ PacingController::PacingController(Clock* clock,
last_process_time_(clock->CurrentTime()),
last_send_time_(last_process_time_),
seen_first_packet_(false),
packet_queue_(CreatePacketQueue(field_trials_, last_process_time_)),
packet_queue_(/*creation_time=*/last_process_time_),
congested_(false),
queue_time_limit_(kMaxExpectedQueueLength),
account_for_audio_(false),
@ -130,14 +119,14 @@ void PacingController::Pause() {
if (!paused_)
RTC_LOG(LS_INFO) << "PacedSender paused.";
paused_ = true;
packet_queue_->SetPauseState(true, CurrentTime());
packet_queue_.SetPauseState(true, CurrentTime());
}
void PacingController::Resume() {
if (paused_)
RTC_LOG(LS_INFO) << "PacedSender resumed.";
paused_ = false;
packet_queue_->SetPauseState(false, CurrentTime());
packet_queue_.SetPauseState(false, CurrentTime());
}
bool PacingController::IsPaused() const {
@ -207,7 +196,7 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size()));
const Timestamp now = CurrentTime();
if (packet_queue_->Empty()) {
if (packet_queue_.Empty()) {
// If queue is empty, we need to "fast-forward" the last process time,
// so that we don't use passed time as budget for sending the first new
// packet.
@ -220,7 +209,7 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
}
UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time));
}
packet_queue_->Push(now, std::move(packet));
packet_queue_.Push(now, std::move(packet));
seen_first_packet_ = true;
// Queue length has increased, check if we need to change the pacing rate.
@ -251,18 +240,18 @@ TimeDelta PacingController::ExpectedQueueTime() const {
}
size_t PacingController::QueueSizePackets() const {
return rtc::checked_cast<size_t>(packet_queue_->SizeInPackets());
return rtc::checked_cast<size_t>(packet_queue_.SizeInPackets());
}
const std::array<int, kNumMediaTypes>&
PacingController::SizeInPacketsPerRtpPacketMediaType() const {
return packet_queue_->SizeInPacketsPerRtpPacketMediaType();
return packet_queue_.SizeInPacketsPerRtpPacketMediaType();
}
DataSize PacingController::QueueSizeData() const {
DataSize size = packet_queue_->SizeInPayloadBytes();
DataSize size = packet_queue_.SizeInPayloadBytes();
if (include_overhead_) {
size += static_cast<int64_t>(packet_queue_->SizeInPackets()) *
size += static_cast<int64_t>(packet_queue_.SizeInPackets()) *
transport_overhead_per_packet_;
}
return size;
@ -277,7 +266,7 @@ absl::optional<Timestamp> PacingController::FirstSentPacketTime() const {
}
Timestamp PacingController::OldestPacketEnqueueTime() const {
return packet_queue_->OldestEnqueueTime();
return packet_queue_.OldestEnqueueTime();
}
TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) {
@ -328,7 +317,7 @@ Timestamp PacingController::NextSendTime() const {
// time is the time at which it was enqueued.
Timestamp unpaced_audio_time =
pace_audio_ ? Timestamp::PlusInfinity()
: packet_queue_->LeadingAudioPacketEnqueueTime();
: packet_queue_.LeadingAudioPacketEnqueueTime();
if (unpaced_audio_time.IsFinite()) {
return unpaced_audio_time;
}
@ -338,7 +327,7 @@ Timestamp PacingController::NextSendTime() const {
return last_send_time_ + kCongestedPacketInterval;
}
if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) {
if (adjusted_media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) {
// If packets are allowed to be sent in a burst, the
// debt is allowed to grow up to one packet more than what can be sent
// during 'send_burst_period_'.
@ -346,7 +335,7 @@ Timestamp PacingController::NextSendTime() const {
next_send_time =
last_process_time_ +
((send_burst_interval_ > drain_time) ? TimeDelta::Zero() : drain_time);
} else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) {
} else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) {
// If we _don't_ have pending packets, check how long until we have
// bandwidth for padding packets. Both media and padding debts must
// have been drained to do this.
@ -539,7 +528,7 @@ void PacingController::ProcessPackets() {
DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size,
DataSize data_sent) const {
if (!packet_queue_->Empty()) {
if (!packet_queue_.Empty()) {
// Actual payload available, no need to add padding.
return DataSize::Zero();
}
@ -588,7 +577,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
}
}
if (packet_queue_->Empty()) {
if (packet_queue_.Empty()) {
return nullptr;
}
@ -596,7 +585,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
// Unpaced audio packets and probes are exempted from send checks.
bool unpaced_audio_packet =
!pace_audio_ && packet_queue_->LeadingAudioPacketEnqueueTime().IsFinite();
!pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().IsFinite();
if (!unpaced_audio_packet && !is_probe) {
if (congested_) {
// Don't send anything if congested.
@ -616,7 +605,7 @@ std::unique_ptr<RtpPacketToSend> PacingController::GetPendingPacket(
}
}
return packet_queue_->Pop();
return packet_queue_.Pop();
}
void PacingController::OnPacketSent(RtpPacketMediaType packet_type,
@ -665,10 +654,10 @@ void PacingController::MaybeUpdateMediaRateDueToLongQueue(Timestamp now) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_->UpdateAverageQueueTime(now);
packet_queue_.UpdateAverageQueueTime(now);
TimeDelta avg_time_left =
std::max(TimeDelta::Millis(1),
queue_time_limit_ - packet_queue_->AverageQueueTime());
queue_time_limit_ - packet_queue_.AverageQueueTime());
DataRate min_rate_needed = queue_size_data / avg_time_left;
if (min_rate_needed > pacing_rate_) {
adjusted_media_rate_ = min_rate_needed;

View File

@ -26,6 +26,7 @@
#include "api/transport/network_types.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/prioritized_packet_queue.h"
#include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_packet_sender.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
@ -53,52 +54,6 @@ class PacingController {
DataSize size) = 0;
};
// Interface for class hanlding storage of and prioritization of packets
// pending to be sent by the pacer.
// Note that for the methods taking a Timestamp as parameter, the parameter
// will never decrease between two subsequent calls.
class PacketQueue {
public:
virtual ~PacketQueue() = default;
virtual void Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) = 0;
virtual std::unique_ptr<RtpPacketToSend> Pop() = 0;
virtual int SizeInPackets() const = 0;
bool Empty() const { return SizeInPackets() == 0; }
virtual DataSize SizeInPayloadBytes() const = 0;
// Total packets in the queue per media type (RtpPacketMediaType values are
// used as lookup index).
virtual const std::array<int, kNumMediaTypes>&
SizeInPacketsPerRtpPacketMediaType() const = 0;
// If the next packet, that would be returned by Pop() if called
// now, is an audio packet this method returns the enqueue time
// of that packet. If queue is empty or top packet is not audio,
// returns Timestamp::MinusInfinity().
virtual Timestamp LeadingAudioPacketEnqueueTime() const = 0;
// Enqueue time of the oldest packet in the queue,
// Timestamp::MinusInfinity() if queue is empty.
virtual Timestamp OldestEnqueueTime() const = 0;
// Average queue time for the packets currently in the queue.
// The queuing time is calculated from Push() to the last UpdateQueueTime()
// call - with any time spent in a paused state subtracted.
// Returns TimeDelta::Zero() for an empty queue.
virtual TimeDelta AverageQueueTime() const = 0;
// Called during packet processing or when pause stats changes. Since the
// AverageQueueTime() method does not look at the wall time, this method
// needs to be called before querying queue time.
virtual void UpdateAverageQueueTime(Timestamp now) = 0;
// Set the pause state, while `paused` is true queuing time is not counted.
virtual void SetPauseState(bool paused, Timestamp now) = 0;
};
// Expected max pacer delay. If ExpectedQueueTime() is higher than
// this value, the packet producers should wait (eg drop frames rather than
// encoding them). Bitrate sent may temporarily exceed target set by
@ -260,7 +215,7 @@ class PacingController {
absl::optional<Timestamp> first_sent_packet_time_;
bool seen_first_packet_;
std::unique_ptr<PacketQueue> packet_queue_;
PrioritizedPacketQueue packet_queue_;
bool congested_;

View File

@ -216,6 +216,10 @@ DataSize PrioritizedPacketQueue::SizeInPayloadBytes() const {
return size_payload_;
}
bool PrioritizedPacketQueue::Empty() const {
return size_packets_ == 0;
}
const std::array<int, kNumMediaTypes>&
PrioritizedPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
return size_packets_per_media_type_;

View File

@ -21,29 +21,63 @@
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
namespace webrtc {
class PrioritizedPacketQueue : public PacingController::PacketQueue {
class PrioritizedPacketQueue {
public:
explicit PrioritizedPacketQueue(Timestamp creation_time);
PrioritizedPacketQueue(const PrioritizedPacketQueue&) = delete;
PrioritizedPacketQueue& operator=(const PrioritizedPacketQueue&) = delete;
void Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) override;
std::unique_ptr<RtpPacketToSend> Pop() override;
int SizeInPackets() const override;
DataSize SizeInPayloadBytes() const override;
// Add a packet to the queue. The enqueue time is used for queue time stats
// and to report the leading packet enqueue time per packet type.
void Push(Timestamp enqueue_time, std::unique_ptr<RtpPacketToSend> packet);
// Remove the next packet from the queue. Packets a prioritized first
// according to packet type, in the following order:
// - audio, retransmissions, video / fec, padding
// For each packet type, we use one FIFO-queue per SSRC and emit from
// those queues in a round-robin fashion.
std::unique_ptr<RtpPacketToSend> Pop();
// Number of packets in the queue.
int SizeInPackets() const;
// Sum of all payload bytes in the queue, where the payload is calculated
// as `packet->payload_size() + packet->padding_size()`.
DataSize SizeInPayloadBytes() const;
// Convenience method for `SizeInPackets() == 0`.
bool Empty() const;
// Total packets in the queue per media type (RtpPacketMediaType values are
// used as lookup index).
const std::array<int, kNumMediaTypes>& SizeInPacketsPerRtpPacketMediaType()
const override;
Timestamp LeadingAudioPacketEnqueueTime() const override;
Timestamp OldestEnqueueTime() const override;
TimeDelta AverageQueueTime() const override;
void UpdateAverageQueueTime(Timestamp now) override;
void SetPauseState(bool paused, Timestamp now) override;
const;
// The enqueue time of the next audio packet this queue will return via the
// Pop() method. If queue has no audio packets, returns MinusInfinity().
Timestamp LeadingAudioPacketEnqueueTime() const;
// Enqueue time of the oldest packet in the queue,
// Timestamp::MinusInfinity() if queue is empty.
Timestamp OldestEnqueueTime() const;
// Average queue time for the packets currently in the queue.
// The queuing time is calculated from Push() to the last UpdateQueueTime()
// call - with any time spent in a paused state subtracted.
// Returns TimeDelta::Zero() for an empty queue.
TimeDelta AverageQueueTime() const;
// Called during packet processing or when pause stats changes. Since the
// AverageQueueTime() method does not look at the wall time, this method
// needs to be called before querying queue time.
void UpdateAverageQueueTime(Timestamp now);
// Set the pause state, while `paused` is true queuing time is not counted.
void SetPauseState(bool paused, Timestamp now);
private:
static constexpr int kNumPriorityLevels = 4;

View File

@ -1,403 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/round_robin_packet_queue.h"
#include <algorithm>
#include <cstdint>
#include <utility>
#include "absl/strings/match.h"
#include "rtc_base/checks.h"
namespace webrtc {
namespace {
static constexpr DataSize kMaxLeadingSize = DataSize::Bytes(1400);
int GetPriorityForType(RtpPacketMediaType type) {
// Lower number takes priority over higher.
switch (type) {
case RtpPacketMediaType::kAudio:
// Audio is always prioritized over other packet types.
return 0;
case RtpPacketMediaType::kRetransmission:
// Send retransmissions before new media.
return 1;
case RtpPacketMediaType::kVideo:
case RtpPacketMediaType::kForwardErrorCorrection:
// Video has "normal" priority, in the old speak.
// Send redundancy concurrently to video. If it is delayed it might have a
// lower chance of being useful.
return 2;
case RtpPacketMediaType::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
return 3;
}
RTC_CHECK_NOTREACHED();
}
} // namespace
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
default;
RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
int priority,
Timestamp enqueue_time,
int64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::unique_ptr<RtpPacketToSend> packet)
: priority_(priority),
enqueue_time_(enqueue_time),
enqueue_order_(enqueue_order),
is_retransmission_(packet->packet_type() ==
RtpPacketMediaType::kRetransmission),
enqueue_time_it_(enqueue_time_it),
owned_packet_(packet.release()) {}
bool RoundRobinPacketQueue::QueuedPacket::operator<(
const RoundRobinPacketQueue::QueuedPacket& other) const {
if (priority_ != other.priority_)
return priority_ > other.priority_;
if (is_retransmission_ != other.is_retransmission_)
return other.is_retransmission_;
return enqueue_order_ > other.enqueue_order_;
}
int RoundRobinPacketQueue::QueuedPacket::Priority() const {
return priority_;
}
RtpPacketMediaType RoundRobinPacketQueue::QueuedPacket::Type() const {
return *owned_packet_->packet_type();
}
uint32_t RoundRobinPacketQueue::QueuedPacket::Ssrc() const {
return owned_packet_->Ssrc();
}
Timestamp RoundRobinPacketQueue::QueuedPacket::EnqueueTime() const {
return enqueue_time_;
}
bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const {
return Type() == RtpPacketMediaType::kRetransmission;
}
int64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const {
return enqueue_order_;
}
RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const {
return owned_packet_;
}
void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator(
std::multiset<Timestamp>::iterator it) {
enqueue_time_it_ = it;
}
std::multiset<Timestamp>::iterator
RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const {
return enqueue_time_it_;
}
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
TimeDelta pause_time_sum) {
enqueue_time_ -= pause_time_sum;
}
RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
RoundRobinPacketQueue::PriorityPacketQueue::begin() const {
return c.begin();
}
RoundRobinPacketQueue::PriorityPacketQueue::const_iterator
RoundRobinPacketQueue::PriorityPacketQueue::end() const {
return c.end();
}
RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
RoundRobinPacketQueue::Stream::~Stream() = default;
RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time)
: transport_overhead_per_packet_(DataSize::Zero()),
time_last_updated_(start_time),
enqueue_count_(0),
paused_(false),
size_packets_(0),
size_packets_per_media_type_({}),
size_(DataSize::Zero()),
max_size_(kMaxLeadingSize),
queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()),
include_overhead_(false) {}
RoundRobinPacketQueue::~RoundRobinPacketQueue() {
// Make sure to release any packets owned by raw pointer in QueuedPacket.
while (size_packets_ > 0) {
Pop();
}
}
void RoundRobinPacketQueue::Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) {
RTC_CHECK(packet->packet_type().has_value());
RtpPacketMediaType packet_type = packet->packet_type().value();
int priority = GetPriorityForType(packet_type);
if (size_packets_ == 0) {
// Single packet fast-path.
single_packet_queue_.emplace(
QueuedPacket(priority, enqueue_time, enqueue_count_++,
enqueue_times_.end(), std::move(packet)));
UpdateAverageQueueTime(enqueue_time);
single_packet_queue_->SubtractPauseTime(pause_time_sum_);
size_packets_ = 1;
++size_packets_per_media_type_[static_cast<size_t>(packet_type)];
size_ += PacketSize(*single_packet_queue_);
} else {
MaybePromoteSinglePacketToNormalQueue();
Push(QueuedPacket(priority, enqueue_time, enqueue_count_++,
enqueue_times_.insert(enqueue_time), std::move(packet)));
}
}
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
if (single_packet_queue_.has_value()) {
RTC_DCHECK(stream_priorities_.empty());
std::unique_ptr<RtpPacketToSend> rtp_packet(
single_packet_queue_->RtpPacket());
single_packet_queue_.reset();
queue_time_sum_ = TimeDelta::Zero();
size_packets_ = 0;
RTC_CHECK(rtp_packet->packet_type().has_value());
RtpPacketMediaType packet_type = rtp_packet->packet_type().value();
size_packets_per_media_type_[static_cast<size_t>(packet_type)] -= 1;
RTC_CHECK_GE(size_packets_per_media_type_[static_cast<size_t>(packet_type)],
0);
size_ = DataSize::Zero();
return rtp_packet;
}
RTC_DCHECK_GT(size_packets_, 0);
Stream* stream = GetHighestPriorityStream();
const QueuedPacket& queued_packet = stream->packet_queue.top();
stream_priorities_.erase(stream->priority_it);
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the `pause_time_sum_ms_` was
// subtracted from `packet.enqueue_time_ms` when the packet was pushed, and
// by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state.
TimeDelta time_in_non_paused_state =
time_last_updated_ - queued_packet.EnqueueTime() - pause_time_sum_;
queue_time_sum_ -= time_in_non_paused_state;
RTC_CHECK(queued_packet.EnqueueTimeIterator() != enqueue_times_.end());
enqueue_times_.erase(queued_packet.EnqueueTimeIterator());
// Update `bytes` of this stream. The general idea is that the stream that
// has sent the least amount of bytes should have the highest priority.
// The problem with that is if streams send with different rates, in which
// case a "budget" will be built up for the stream sending at the lower
// rate. To avoid building a too large budget we limit `bytes` to be within
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
DataSize packet_size = PacketSize(queued_packet);
stream->size =
std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
max_size_ = std::max(max_size_, stream->size);
size_ -= packet_size;
size_packets_ -= 1;
size_packets_per_media_type_[static_cast<size_t>(queued_packet.Type())] -= 1;
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
RTC_CHECK_GE(
size_packets_per_media_type_[static_cast<size_t>(queued_packet.Type())],
0);
std::unique_ptr<RtpPacketToSend> rtp_packet(queued_packet.RtpPacket());
stream->packet_queue.pop();
// If there are packets left to be sent, schedule the stream again.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
if (stream->packet_queue.empty()) {
stream->priority_it = stream_priorities_.end();
} else {
int priority = stream->packet_queue.top().Priority();
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(priority, stream->size), stream->ssrc);
}
return rtp_packet;
}
int RoundRobinPacketQueue::SizeInPackets() const {
return size_packets_;
}
DataSize RoundRobinPacketQueue::SizeInPayloadBytes() const {
return size_;
}
const std::array<int, kNumMediaTypes>&
RoundRobinPacketQueue::SizeInPacketsPerRtpPacketMediaType() const {
return size_packets_per_media_type_;
}
Timestamp RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() const {
if (single_packet_queue_.has_value()) {
if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) {
return single_packet_queue_->EnqueueTime();
}
return Timestamp::MinusInfinity();
}
if (stream_priorities_.empty()) {
return Timestamp::MinusInfinity();
}
uint32_t ssrc = stream_priorities_.begin()->second;
const auto& top_packet = streams_.find(ssrc)->second.packet_queue.top();
if (top_packet.Type() == RtpPacketMediaType::kAudio) {
return top_packet.EnqueueTime();
}
return Timestamp::MinusInfinity();
}
Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
if (single_packet_queue_.has_value()) {
return single_packet_queue_->EnqueueTime();
}
if (size_packets_ == 0)
return Timestamp::MinusInfinity();
RTC_CHECK(!enqueue_times_.empty());
return *enqueue_times_.begin();
}
void RoundRobinPacketQueue::UpdateAverageQueueTime(Timestamp now) {
RTC_CHECK_GE(now, time_last_updated_);
if (now == time_last_updated_)
return;
TimeDelta delta = now - time_last_updated_;
if (paused_) {
pause_time_sum_ += delta;
} else {
queue_time_sum_ += delta * size_packets_;
}
time_last_updated_ = now;
}
void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
if (paused_ == paused)
return;
UpdateAverageQueueTime(now);
paused_ = paused;
}
TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
if (size_packets_ == 0)
return TimeDelta::Zero();
return queue_time_sum_ / size_packets_;
}
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.Ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.Ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.Ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
// If the SSRC is not currently scheduled, add it to `stream_priorities_`.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
} else if (packet.Priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that `priority_` uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.Priority(), stream->size), packet.Ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
// Promotion from single-packet queue. Just add to enqueue times.
packet.UpdateEnqueueTimeIterator(
enqueue_times_.insert(packet.EnqueueTime()));
} else {
// In order to figure out how much time a packet has spent in the queue
// while not in a paused state, we subtract the total amount of time the
// queue has been paused so far, and when the packet is popped we subtract
// the total amount of time the queue has been paused at that moment. This
// way we subtract the total amount of time the packet has spent in the
// queue while in a paused state.
UpdateAverageQueueTime(packet.EnqueueTime());
packet.SubtractPauseTime(pause_time_sum_);
size_packets_ += 1;
size_packets_per_media_type_[static_cast<size_t>(packet.Type())] += 1;
size_ += PacketSize(packet);
}
stream->packet_queue.push(packet);
}
DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const {
DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() +
packet.RtpPacket()->padding_size());
if (include_overhead_) {
packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
transport_overhead_per_packet_;
}
return packet_size;
}
void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() {
if (single_packet_queue_.has_value()) {
Push(*single_packet_queue_);
single_packet_queue_.reset();
}
}
RoundRobinPacketQueue::Stream*
RoundRobinPacketQueue::GetHighestPriorityStream() {
RTC_CHECK(!stream_priorities_.empty());
uint32_t ssrc = stream_priorities_.begin()->second;
auto stream_info_it = streams_.find(ssrc);
RTC_CHECK(stream_info_it != streams_.end());
RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
RTC_CHECK(!stream_info_it->second.packet_queue.empty());
return &stream_info_it->second;
}
bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
for (const auto& scheduled_stream : stream_priorities_) {
if (scheduled_stream.second == ssrc)
return true;
}
return false;
}
} // namespace webrtc

View File

@ -1,172 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#include <stddef.h>
#include <stdint.h>
#include <list>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <unordered_map>
#include "absl/types/optional.h"
#include "api/units/data_size.h"
#include "api/units/time_delta.h"
#include "api/units/timestamp.h"
#include "modules/pacing/pacing_controller.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
namespace webrtc {
class RoundRobinPacketQueue : public PacingController::PacketQueue {
public:
explicit RoundRobinPacketQueue(Timestamp start_time);
~RoundRobinPacketQueue();
void Push(Timestamp enqueue_time,
std::unique_ptr<RtpPacketToSend> packet) override;
std::unique_ptr<RtpPacketToSend> Pop() override;
int SizeInPackets() const override;
DataSize SizeInPayloadBytes() const override;
const std::array<int, kNumMediaTypes>& SizeInPacketsPerRtpPacketMediaType()
const override;
Timestamp LeadingAudioPacketEnqueueTime() const override;
Timestamp OldestEnqueueTime() const override;
TimeDelta AverageQueueTime() const override;
void UpdateAverageQueueTime(Timestamp now) override;
void SetPauseState(bool paused, Timestamp now) override;
private:
struct QueuedPacket {
public:
QueuedPacket(int priority,
Timestamp enqueue_time,
int64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::unique_ptr<RtpPacketToSend> packet);
QueuedPacket(const QueuedPacket& rhs);
~QueuedPacket();
bool operator<(const QueuedPacket& other) const;
int Priority() const;
RtpPacketMediaType Type() const;
uint32_t Ssrc() const;
Timestamp EnqueueTime() const;
bool IsRetransmission() const;
int64_t EnqueueOrder() const;
RtpPacketToSend* RtpPacket() const;
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const;
void UpdateEnqueueTimeIterator(std::multiset<Timestamp>::iterator it);
void SubtractPauseTime(TimeDelta pause_time_sum);
private:
int priority_;
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
int64_t enqueue_order_;
bool is_retransmission_; // Cached for performance.
std::multiset<Timestamp>::iterator enqueue_time_it_;
// Raw pointer since priority_queue doesn't allow for moving
// out of the container.
RtpPacketToSend* owned_packet_;
};
class PriorityPacketQueue : public std::priority_queue<QueuedPacket> {
public:
using const_iterator = container_type::const_iterator;
const_iterator begin() const;
const_iterator end() const;
};
struct StreamPrioKey {
StreamPrioKey(int priority, DataSize size)
: priority(priority), size(size) {}
bool operator<(const StreamPrioKey& other) const {
if (priority != other.priority)
return priority < other.priority;
return size < other.size;
}
const int priority;
const DataSize size;
};
struct Stream {
Stream();
Stream(const Stream&);
virtual ~Stream();
DataSize size;
uint32_t ssrc;
PriorityPacketQueue packet_queue;
// Whenever a packet is inserted for this stream we check if `priority_it`
// points to an element in `stream_priorities_`, and if it does it means
// this stream has already been scheduled, and if the scheduled priority is
// lower than the priority of the incoming packet we reschedule this stream
// with the higher priority.
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
};
void Push(QueuedPacket packet);
DataSize PacketSize(const QueuedPacket& packet) const;
void MaybePromoteSinglePacketToNormalQueue();
Stream* GetHighestPriorityStream();
// Just used to verify correctness.
bool IsSsrcScheduled(uint32_t ssrc) const;
DataSize transport_overhead_per_packet_;
Timestamp time_last_updated_;
int64_t enqueue_count_;
bool paused_;
int size_packets_;
std::array<int, kNumMediaTypes> size_packets_per_media_type_;
DataSize size_;
DataSize max_size_;
TimeDelta queue_time_sum_;
TimeDelta pause_time_sum_;
// A map of streams used to prioritize from which stream to send next. We use
// a multimap instead of a priority_queue since the priority of a stream can
// change as a new packet is inserted, and a multimap allows us to remove and
// then reinsert a StreamPrioKey if the priority has increased.
std::multimap<StreamPrioKey, uint32_t> stream_priorities_;
// A map of SSRCs to Streams.
std::unordered_map<uint32_t, Stream> streams_;
// The enqueue time of every packet currently in the queue. Used to figure out
// the age of the oldest packet in the queue.
std::multiset<Timestamp> enqueue_times_;
absl::optional<QueuedPacket> single_packet_queue_;
bool include_overhead_;
};
} // namespace webrtc
#endif // MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_

View File

@ -1,98 +0,0 @@
/*
* Copyright (c) 2022 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/round_robin_packet_queue.h"
#include <utility>
#include "api/units/timestamp.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/checks.h"
#include "test/gmock.h"
#include "test/gtest.h"
namespace webrtc {
namespace {
constexpr uint32_t kDefaultSsrc = 123;
constexpr int kDefaultPayloadSize = 321;
std::unique_ptr<RtpPacketToSend> CreatePacket(RtpPacketMediaType type,
uint16_t sequence_number) {
auto packet = std::make_unique<RtpPacketToSend>(/*extensions=*/nullptr);
packet->set_packet_type(type);
packet->SetSsrc(kDefaultSsrc);
packet->SetSequenceNumber(sequence_number);
packet->SetPayloadSize(kDefaultPayloadSize);
return packet;
}
} // namespace
TEST(RoundRobinPacketQueueTest,
PushAndPopUpdatesSizeInPacketsPerRtpPacketMediaType) {
Timestamp now = Timestamp::Zero();
RoundRobinPacketQueue queue(now);
// Initially all sizes are zero.
for (size_t i = 0; i < kNumMediaTypes; ++i) {
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0);
}
// Push packets.
queue.Push(now, CreatePacket(RtpPacketMediaType::kAudio, 1));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kAudio)],
1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kVideo, 2));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kVideo)],
1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kRetransmission, 3));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kRetransmission)],
1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kForwardErrorCorrection, 4));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kForwardErrorCorrection)],
1);
queue.Push(now, CreatePacket(RtpPacketMediaType::kPadding, 5));
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
RtpPacketMediaType::kPadding)],
1);
// Now all sizes are 1.
for (size_t i = 0; i < kNumMediaTypes; ++i) {
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 1);
}
// Popping happens in a priority order based on media type. This test does not
// assert what this order is, only that the counter for the popped packet's
// media type is decremented.
for (size_t i = 0; i < kNumMediaTypes; ++i) {
auto popped_packet = queue.Pop();
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[static_cast<size_t>(
popped_packet->packet_type().value())],
0);
}
// We've popped all packets, so all sizes are zero.
for (size_t i = 0; i < kNumMediaTypes; ++i) {
EXPECT_EQ(queue.SizeInPacketsPerRtpPacketMediaType()[i], 0);
}
}
} // namespace webrtc