diff --git a/modules/pacing/pacing_controller.cc b/modules/pacing/pacing_controller.cc index f762b96158..5e18d25652 100644 --- a/modules/pacing/pacing_controller.cc +++ b/modules/pacing/pacing_controller.cc @@ -18,6 +18,7 @@ #include "absl/strings/match.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" +#include "modules/pacing/round_robin_packet_queue.h" #include "rtc_base/checks.h" #include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/logging.h" @@ -96,8 +97,9 @@ PacingController::PacingController(Clock* clock, pacing_bitrate_(DataRate::Zero()), last_process_time_(clock->CurrentTime()), last_send_time_(last_process_time_), - packet_queue_(last_process_time_), - packet_counter_(0), + seen_first_packet_(false), + packet_queue_( + std::make_unique(last_process_time_)), congested_(false), queue_time_limit_(kMaxExpectedQueueLength), account_for_audio_(false), @@ -123,14 +125,14 @@ void PacingController::Pause() { if (!paused_) RTC_LOG(LS_INFO) << "PacedSender paused."; paused_ = true; - packet_queue_.SetPauseState(true, CurrentTime()); + packet_queue_->SetPauseState(true, CurrentTime()); } void PacingController::Resume() { if (paused_) RTC_LOG(LS_INFO) << "PacedSender resumed."; paused_ = false; - packet_queue_.SetPauseState(false, CurrentTime()); + packet_queue_->SetPauseState(false, CurrentTime()); } bool PacingController::IsPaused() const { @@ -162,7 +164,7 @@ Timestamp PacingController::CurrentTime() const { } void PacingController::SetProbingEnabled(bool enabled) { - RTC_CHECK_EQ(0, packet_counter_); + RTC_CHECK(!seen_first_packet_); prober_.SetEnabled(enabled); } @@ -196,7 +198,7 @@ void PacingController::EnqueuePacket(std::unique_ptr packet) { prober_.OnIncomingPacket(DataSize::Bytes(packet->payload_size())); Timestamp now = CurrentTime(); - if (mode_ == ProcessMode::kDynamic && packet_queue_.Empty()) { + if (mode_ == ProcessMode::kDynamic && packet_queue_->Empty()) { // If queue is empty, we need to "fast-forward" the last process time, // so that we don't use passed time as budget for sending the first new // packet. @@ -209,7 +211,8 @@ void PacingController::EnqueuePacket(std::unique_ptr packet) { } UpdateBudgetWithElapsedTime(UpdateTimeAndGetElapsed(target_process_time)); } - packet_queue_.Push(now, packet_counter_++, std::move(packet)); + packet_queue_->Push(now, std::move(packet)); + seen_first_packet_ = true; } void PacingController::SetAccountForAudioPackets(bool account_for_audio) { @@ -218,14 +221,12 @@ void PacingController::SetAccountForAudioPackets(bool account_for_audio) { void PacingController::SetIncludeOverhead() { include_overhead_ = true; - packet_queue_.SetIncludeOverhead(); } void PacingController::SetTransportOverhead(DataSize overhead_per_packet) { if (ignore_transport_overhead_) return; transport_overhead_per_packet_ = overhead_per_packet; - packet_queue_.SetTransportOverhead(overhead_per_packet); } TimeDelta PacingController::ExpectedQueueTime() const { @@ -236,11 +237,16 @@ TimeDelta PacingController::ExpectedQueueTime() const { } size_t PacingController::QueueSizePackets() const { - return packet_queue_.SizeInPackets(); + return packet_queue_->SizeInPackets(); } DataSize PacingController::QueueSizeData() const { - return packet_queue_.Size(); + DataSize size = packet_queue_->SizeInPayloadBytes(); + if (include_overhead_) { + size += static_cast(packet_queue_->SizeInPackets()) * + transport_overhead_per_packet_; + } + return size; } DataSize PacingController::CurrentBufferLevel() const { @@ -252,7 +258,7 @@ absl::optional PacingController::FirstSentPacketTime() const { } Timestamp PacingController::OldestPacketEnqueueTime() const { - return packet_queue_.OldestEnqueueTime(); + return packet_queue_->OldestEnqueueTime(); } TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { @@ -273,8 +279,7 @@ TimeDelta PacingController::UpdateTimeAndGetElapsed(Timestamp now) { } bool PacingController::ShouldSendKeepalive(Timestamp now) const { - if (send_padding_if_silent_ || paused_ || congested_ || - packet_counter_ == 0) { + if (send_padding_if_silent_ || paused_ || congested_ || !seen_first_packet_) { // We send a padding packet every 500 ms to ensure we won't get stuck in // congested state due to no feedback being received. if (now - last_send_time_ >= kCongestedPacketInterval) { @@ -310,22 +315,22 @@ Timestamp PacingController::NextSendTime() const { // Not pacing audio, if leading packet is audio its target send // time is the time at which it was enqueued. - absl::optional unpaced_audio_time = - pace_audio_ ? absl::nullopt - : packet_queue_.LeadingAudioPacketEnqueueTime(); - if (unpaced_audio_time) { - return *unpaced_audio_time; + Timestamp unpaced_audio_time = + pace_audio_ ? Timestamp::PlusInfinity() + : packet_queue_->LeadingAudioPacketEnqueueTime(); + if (unpaced_audio_time.IsFinite()) { + return unpaced_audio_time; } - if (congested_ || packet_counter_ == 0) { + if (congested_ || !seen_first_packet_) { // We need to at least send keep-alive packets with some interval. return last_send_time_ + kCongestedPacketInterval; } - if (media_rate_ > DataRate::Zero() && !packet_queue_.Empty()) { + if (media_rate_ > DataRate::Zero() && !packet_queue_->Empty()) { // Check how long until we can send the next media packet. next_send_time = last_process_time_ + media_debt_ / media_rate_; - } else if (padding_rate_ > DataRate::Zero() && packet_queue_.Empty()) { + } else if (padding_rate_ > DataRate::Zero() && packet_queue_->Empty()) { // If we _don't_ have pending packets, check how long until we have // bandwidth for padding packets. Both media and padding debts must // have been drained to do this. @@ -361,7 +366,7 @@ void PacingController::ProcessPackets() { DataSize keepalive_data_sent = DataSize::Zero(); // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. - if (packet_counter_ > 0) { + if (seen_first_packet_) { std::vector> keepalive_packets = packet_sender_->GeneratePadding(DataSize::Bytes(1)); for (auto& packet : keepalive_packets) { @@ -397,16 +402,16 @@ void PacingController::ProcessPackets() { if (elapsed_time > TimeDelta::Zero()) { DataRate target_rate = pacing_bitrate_; - DataSize queue_size_data = packet_queue_.Size(); + DataSize queue_size_data = QueueSizeData(); if (queue_size_data > DataSize::Zero()) { // Assuming equal size packets and input/output rate, the average packet // has avg_time_left_ms left to get queue_size_bytes out of the queue, if // time constraint shall be met. Determine bitrate needed for that. - packet_queue_.UpdateQueueTime(now); + packet_queue_->UpdateAverageQueueTime(now); if (drain_large_queues_) { TimeDelta avg_time_left = std::max(TimeDelta::Millis(1), - queue_time_limit_ - packet_queue_.AverageQueueTime()); + queue_time_limit_ - packet_queue_->AverageQueueTime()); DataRate min_rate_needed = queue_size_data / avg_time_left; if (min_rate_needed > target_rate) { target_rate = min_rate_needed; @@ -545,7 +550,7 @@ void PacingController::ProcessPackets() { DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, DataSize data_sent) const { - if (!packet_queue_.Empty()) { + if (!packet_queue_->Empty()) { // Actual payload available, no need to add padding. return DataSize::Zero(); } @@ -555,7 +560,7 @@ DataSize PacingController::PaddingToAdd(DataSize recommended_probe_size, return DataSize::Zero(); } - if (packet_counter_ == 0) { + if (!seen_first_packet_) { // We can not send padding unless a normal packet has first been sent. If // we do, timestamps get messed up. return DataSize::Zero(); @@ -597,7 +602,7 @@ std::unique_ptr PacingController::GetPendingPacket( } } - if (packet_queue_.Empty()) { + if (packet_queue_->Empty()) { return nullptr; } @@ -605,7 +610,7 @@ std::unique_ptr PacingController::GetPendingPacket( // Unpaced audio packets and probes are exempted from send checks. bool unpaced_audio_packet = - !pace_audio_ && packet_queue_.LeadingAudioPacketEnqueueTime().has_value(); + !pace_audio_ && packet_queue_->LeadingAudioPacketEnqueueTime().IsFinite(); if (!unpaced_audio_packet && !is_probe) { if (congested_) { // Don't send anything if congested. @@ -631,7 +636,7 @@ std::unique_ptr PacingController::GetPendingPacket( } } - return packet_queue_.Pop(); + return packet_queue_->Pop(); } void PacingController::OnPacketSent(RtpPacketMediaType packet_type, diff --git a/modules/pacing/pacing_controller.h b/modules/pacing/pacing_controller.h index c87154d1c2..85b494474f 100644 --- a/modules/pacing/pacing_controller.h +++ b/modules/pacing/pacing_controller.h @@ -25,7 +25,6 @@ #include "api/transport/network_types.h" #include "modules/pacing/bitrate_prober.h" #include "modules/pacing/interval_budget.h" -#include "modules/pacing/round_robin_packet_queue.h" #include "modules/pacing/rtp_packet_pacer.h" #include "modules/rtp_rtcp/include/rtp_packet_sender.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" @@ -60,6 +59,46 @@ class PacingController { DataSize size) = 0; }; + // Interface for class hanlding storage of and prioritization of packets + // pending to be sent by the pacer. + // Note that for the methods taking a Timestamp as parameter, the parameter + // will never decrease between two subsequent calls. + class PacketQueue { + public: + virtual ~PacketQueue() = default; + + virtual void Push(Timestamp enqueue_time, + std::unique_ptr packet) = 0; + virtual std::unique_ptr Pop() = 0; + + virtual size_t SizeInPackets() const = 0; + bool Empty() const { return SizeInPackets() == 0; } + virtual DataSize SizeInPayloadBytes() const = 0; + + // If the next packet, that would be returned by Pop() if called + // now, is an audio packet this method returns the enqueue time + // of that packet. If queue is empty or top packet is not audio, + // returns Timestamp::MinusInfinity(). + virtual Timestamp LeadingAudioPacketEnqueueTime() const = 0; + + // Enqueue time of the oldest packet in the queue, + // Timestamp::MinusInfinity() if queue is empty. + virtual Timestamp OldestEnqueueTime() const = 0; + + // Average queue time for the packets currently in the queue. + // The queuing time is calculated from Push() to the last UpdateQueueTime() + // call - with any time spent in a paused state subtracted. + virtual TimeDelta AverageQueueTime() const = 0; + + // Called during packet processing or when pause stats changes. Since the + // AverageQueueTime() method does not look at the wall time, this method + // needs to be called before querying queue time. + virtual void UpdateAverageQueueTime(Timestamp now) = 0; + + // Set the pause state, while `paused` is true queuing time is not counted. + virtual void SetPauseState(bool paused, Timestamp now) = 0; + }; + // Expected max pacer delay. If ExpectedQueueTime() is higher than // this value, the packet producers should wait (eg drop frames rather than // encoding them). Bitrate sent may temporarily exceed target set by @@ -219,9 +258,9 @@ class PacingController { Timestamp last_process_time_; Timestamp last_send_time_; absl::optional first_sent_packet_time_; + bool seen_first_packet_; - RoundRobinPacketQueue packet_queue_; - uint64_t packet_counter_; + std::unique_ptr packet_queue_; bool congested_; diff --git a/modules/pacing/round_robin_packet_queue.cc b/modules/pacing/round_robin_packet_queue.cc index b372d66973..02d4caaa4c 100644 --- a/modules/pacing/round_robin_packet_queue.cc +++ b/modules/pacing/round_robin_packet_queue.cc @@ -53,7 +53,7 @@ RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default; RoundRobinPacketQueue::QueuedPacket::QueuedPacket( int priority, Timestamp enqueue_time, - uint64_t enqueue_order, + int64_t enqueue_order, std::multiset::iterator enqueue_time_it, std::unique_ptr packet) : priority_(priority), @@ -94,7 +94,7 @@ bool RoundRobinPacketQueue::QueuedPacket::IsRetransmission() const { return Type() == RtpPacketMediaType::kRetransmission; } -uint64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const { +int64_t RoundRobinPacketQueue::QueuedPacket::EnqueueOrder() const { return enqueue_order_; } @@ -134,6 +134,7 @@ RoundRobinPacketQueue::Stream::~Stream() = default; RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time) : transport_overhead_per_packet_(DataSize::Zero()), time_last_updated_(start_time), + enqueue_count_(0), paused_(false), size_packets_(0), size_(DataSize::Zero()), @@ -144,28 +145,27 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(Timestamp start_time) RoundRobinPacketQueue::~RoundRobinPacketQueue() { // Make sure to release any packets owned by raw pointer in QueuedPacket. - while (!Empty()) { + while (size_packets_ > 0) { Pop(); } } void RoundRobinPacketQueue::Push(Timestamp enqueue_time, - uint64_t enqueue_order, std::unique_ptr packet) { RTC_DCHECK(packet->packet_type().has_value()); int priority = GetPriorityForType(*packet->packet_type()); if (size_packets_ == 0) { // Single packet fast-path. single_packet_queue_.emplace( - QueuedPacket(priority, enqueue_time, enqueue_order, + QueuedPacket(priority, enqueue_time, enqueue_count_++, enqueue_times_.end(), std::move(packet))); - UpdateQueueTime(enqueue_time); + UpdateAverageQueueTime(enqueue_time); single_packet_queue_->SubtractPauseTime(pause_time_sum_); size_packets_ = 1; size_ += PacketSize(*single_packet_queue_); } else { MaybePromoteSinglePacketToNormalQueue(); - Push(QueuedPacket(priority, enqueue_time, enqueue_order, + Push(QueuedPacket(priority, enqueue_time, enqueue_count_++, enqueue_times_.insert(enqueue_time), std::move(packet))); } } @@ -182,7 +182,7 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { return rtp_packet; } - RTC_DCHECK(!Empty()); + RTC_DCHECK_GT(size_packets_, 0u); Stream* stream = GetHighestPriorityStream(); const QueuedPacket& queued_packet = stream->packet_queue.top(); @@ -231,34 +231,24 @@ std::unique_ptr RoundRobinPacketQueue::Pop() { return rtp_packet; } -bool RoundRobinPacketQueue::Empty() const { - if (size_packets_ == 0) { - RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty()); - return true; - } - RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty()); - return false; -} - size_t RoundRobinPacketQueue::SizeInPackets() const { return size_packets_; } -DataSize RoundRobinPacketQueue::Size() const { +DataSize RoundRobinPacketQueue::SizeInPayloadBytes() const { return size_; } -absl::optional RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() - const { +Timestamp RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() const { if (single_packet_queue_.has_value()) { if (single_packet_queue_->Type() == RtpPacketMediaType::kAudio) { return single_packet_queue_->EnqueueTime(); } - return absl::nullopt; + return Timestamp::MinusInfinity(); } if (stream_priorities_.empty()) { - return absl::nullopt; + return Timestamp::MinusInfinity(); } uint32_t ssrc = stream_priorities_.begin()->second; @@ -266,7 +256,7 @@ absl::optional RoundRobinPacketQueue::LeadingAudioPacketEnqueueTime() if (top_packet.Type() == RtpPacketMediaType::kAudio) { return top_packet.EnqueueTime(); } - return absl::nullopt; + return Timestamp::MinusInfinity(); } Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { @@ -274,13 +264,13 @@ Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const { return single_packet_queue_->EnqueueTime(); } - if (Empty()) + if (size_packets_ == 0) return Timestamp::MinusInfinity(); RTC_CHECK(!enqueue_times_.empty()); return *enqueue_times_.begin(); } -void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) { +void RoundRobinPacketQueue::UpdateAverageQueueTime(Timestamp now) { RTC_CHECK_GE(now, time_last_updated_); if (now == time_last_updated_) return; @@ -299,38 +289,12 @@ void RoundRobinPacketQueue::UpdateQueueTime(Timestamp now) { void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) { if (paused_ == paused) return; - UpdateQueueTime(now); + UpdateAverageQueueTime(now); paused_ = paused; } -void RoundRobinPacketQueue::SetIncludeOverhead() { - MaybePromoteSinglePacketToNormalQueue(); - include_overhead_ = true; - // We need to update the size to reflect overhead for existing packets. - for (const auto& stream : streams_) { - for (const QueuedPacket& packet : stream.second.packet_queue) { - size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) + - transport_overhead_per_packet_; - } - } -} - -void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) { - MaybePromoteSinglePacketToNormalQueue(); - if (include_overhead_) { - DataSize previous_overhead = transport_overhead_per_packet_; - // We need to update the size to reflect overhead for existing packets. - for (const auto& stream : streams_) { - int packets = stream.second.packet_queue.size(); - size_ -= packets * previous_overhead; - size_ += packets * overhead_per_packet; - } - } - transport_overhead_per_packet_ = overhead_per_packet; -} - TimeDelta RoundRobinPacketQueue::AverageQueueTime() const { - if (Empty()) + if (size_packets_ == 0) return TimeDelta::Zero(); return queue_time_sum_ / size_packets_; } @@ -371,7 +335,7 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) { // the total amount of time the queue has been paused at that moment. This // way we subtract the total amount of time the packet has spent in the // queue while in a paused state. - UpdateQueueTime(packet.EnqueueTime()); + UpdateAverageQueueTime(packet.EnqueueTime()); packet.SubtractPauseTime(pause_time_sum_); size_packets_ += 1; diff --git a/modules/pacing/round_robin_packet_queue.h b/modules/pacing/round_robin_packet_queue.h index 38149f40f0..c099943c47 100644 --- a/modules/pacing/round_robin_packet_queue.h +++ b/modules/pacing/round_robin_packet_queue.h @@ -25,44 +25,36 @@ #include "api/units/data_size.h" #include "api/units/time_delta.h" #include "api/units/timestamp.h" +#include "modules/pacing/pacing_controller.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "system_wrappers/include/clock.h" namespace webrtc { -class RoundRobinPacketQueue { +class RoundRobinPacketQueue : public PacingController::PacketQueue { public: explicit RoundRobinPacketQueue(Timestamp start_time); ~RoundRobinPacketQueue(); void Push(Timestamp enqueue_time, - uint64_t enqueue_order, - std::unique_ptr packet); - std::unique_ptr Pop(); + std::unique_ptr packet) override; + std::unique_ptr Pop() override; - bool Empty() const; - size_t SizeInPackets() const; - DataSize Size() const; - // If the next packet, that would be returned by Pop() if called - // now, is an audio packet this method returns the enqueue time - // of that packet. If queue is empty or top packet is not audio, - // returns nullopt. - absl::optional LeadingAudioPacketEnqueueTime() const; - - Timestamp OldestEnqueueTime() const; - TimeDelta AverageQueueTime() const; - void UpdateQueueTime(Timestamp now); - void SetPauseState(bool paused, Timestamp now); - void SetIncludeOverhead(); - void SetTransportOverhead(DataSize overhead_per_packet); + size_t SizeInPackets() const override; + DataSize SizeInPayloadBytes() const override; + Timestamp LeadingAudioPacketEnqueueTime() const override; + Timestamp OldestEnqueueTime() const override; + TimeDelta AverageQueueTime() const override; + void UpdateAverageQueueTime(Timestamp now) override; + void SetPauseState(bool paused, Timestamp now) override; private: struct QueuedPacket { public: QueuedPacket(int priority, Timestamp enqueue_time, - uint64_t enqueue_order, + int64_t enqueue_order, std::multiset::iterator enqueue_time_it, std::unique_ptr packet); QueuedPacket(const QueuedPacket& rhs); @@ -75,7 +67,7 @@ class RoundRobinPacketQueue { uint32_t Ssrc() const; Timestamp EnqueueTime() const; bool IsRetransmission() const; - uint64_t EnqueueOrder() const; + int64_t EnqueueOrder() const; RtpPacketToSend* RtpPacket() const; std::multiset::iterator EnqueueTimeIterator() const; @@ -85,7 +77,7 @@ class RoundRobinPacketQueue { private: int priority_; Timestamp enqueue_time_; // Absolute time of pacer queue entry. - uint64_t enqueue_order_; + int64_t enqueue_order_; bool is_retransmission_; // Cached for performance. std::multiset::iterator enqueue_time_it_; // Raw pointer since priority_queue doesn't allow for moving @@ -147,6 +139,8 @@ class RoundRobinPacketQueue { Timestamp time_last_updated_; + int64_t enqueue_count_; + bool paused_; size_t size_packets_; DataSize size_;