Removes legacy PacketQueue implementation.

Also cleans up usage of the new RoundRobinPacketQueue to reduce code
bloat.

Bug: webrtc:8288
Change-Id: I90f17a4422b32c1d4e2d7d5065573157346d6a0b
Reviewed-on: https://webrtc-review.googlesource.com/100306
Reviewed-by: Philip Eliasson <philipel@webrtc.org>
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#24744}
This commit is contained in:
Sebastian Jansson
2018-09-13 17:11:06 +02:00
committed by Commit Bot
parent c7d935899a
commit 60570dc8c4
9 changed files with 91 additions and 382 deletions

View File

@ -15,10 +15,6 @@ rtc_static_library("pacing") {
"paced_sender.cc",
"paced_sender.h",
"pacer.h",
"packet_queue.cc",
"packet_queue.h",
"packet_queue_interface.cc",
"packet_queue_interface.h",
"packet_router.cc",
"packet_router.h",
"round_robin_packet_queue.cc",

View File

@ -22,7 +22,6 @@
#include "modules/include/module_common_types.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
@ -51,15 +50,6 @@ const float PacedSender::kDefaultPaceMultiplier = 2.5f;
PacedSender::PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log)
: PacedSender(clock,
packet_sender,
event_log,
absl::make_unique<RoundRobinPacketQueue>(clock)) {}
PacedSender::PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
std::unique_ptr<PacketQueueInterface> packets)
: clock_(clock),
packet_sender_(packet_sender),
alr_detector_(absl::make_unique<AlrDetector>(event_log)),
@ -80,7 +70,7 @@ PacedSender::PacedSender(const Clock* clock,
time_last_process_us_(clock->TimeInMicroseconds()),
last_send_time_us_(clock->TimeInMicroseconds()),
first_sent_packet_ms_(-1),
packets_(std::move(packets)),
packets_(clock),
packet_counter_(0),
pacing_factor_(kDefaultPaceMultiplier),
queue_time_limit(kMaxQueueLengthMs),
@ -104,7 +94,7 @@ void PacedSender::Pause() {
if (!paused_)
RTC_LOG(LS_INFO) << "PacedSender paused.";
paused_ = true;
packets_->SetPauseState(true, TimeMilliseconds());
packets_.SetPauseState(true, TimeMilliseconds());
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to get
@ -119,7 +109,7 @@ void PacedSender::Resume() {
if (paused_)
RTC_LOG(LS_INFO) << "PacedSender resumed.";
paused_ = false;
packets_->SetPauseState(false, TimeMilliseconds());
packets_.SetPauseState(false, TimeMilliseconds());
}
rtc::CritScope cs(&process_thread_lock_);
// Tell the process thread to call our TimeUntilNextProcess() method to
@ -212,7 +202,7 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
if (capture_time_ms < 0)
capture_time_ms = now_ms;
packets_->Push(PacketQueueInterface::Packet(
packets_.Push(RoundRobinPacketQueue::Packet(
priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes,
retransmission, packet_counter_++));
}
@ -225,7 +215,7 @@ void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
int64_t PacedSender::ExpectedQueueTimeMs() const {
rtc::CritScope cs(&critsect_);
RTC_DCHECK_GT(pacing_bitrate_kbps_, 0);
return static_cast<int64_t>(packets_->SizeInBytes() * 8 /
return static_cast<int64_t>(packets_.SizeInBytes() * 8 /
pacing_bitrate_kbps_);
}
@ -237,7 +227,7 @@ absl::optional<int64_t> PacedSender::GetApplicationLimitedRegionStartTime()
size_t PacedSender::QueueSizePackets() const {
rtc::CritScope cs(&critsect_);
return packets_->SizeInPackets();
return packets_.SizeInPackets();
}
int64_t PacedSender::FirstSentPacketTimeMs() const {
@ -248,7 +238,7 @@ int64_t PacedSender::FirstSentPacketTimeMs() const {
int64_t PacedSender::QueueInMs() const {
rtc::CritScope cs(&critsect_);
int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
int64_t oldest_packet = packets_.OldestEnqueueTimeMs();
if (oldest_packet == 0)
return 0;
@ -303,15 +293,15 @@ void PacedSender::Process() {
if (elapsed_time_ms > 0) {
int target_bitrate_kbps = pacing_bitrate_kbps_;
size_t queue_size_bytes = packets_->SizeInBytes();
size_t queue_size_bytes = packets_.SizeInBytes();
if (queue_size_bytes > 0) {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packets_->UpdateQueueTime(TimeMilliseconds());
packets_.UpdateQueueTime(TimeMilliseconds());
if (drain_large_queues_) {
int64_t avg_time_left_ms = std::max<int64_t>(
1, queue_time_limit - packets_->AverageQueueTimeMs());
1, queue_time_limit - packets_.AverageQueueTimeMs());
int min_bitrate_needed_kbps =
static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
if (min_bitrate_needed_kbps > target_bitrate_kbps)
@ -333,26 +323,26 @@ void PacedSender::Process() {
}
// The paused state is checked in the loop since SendPacket leaves the
// critical section allowing the paused state to be changed from other code.
while (!packets_->Empty() && !paused_) {
while (!packets_.Empty() && !paused_) {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const PacketQueueInterface::Packet& packet = packets_->BeginPop();
const RoundRobinPacketQueue::Packet& packet = packets_.BeginPop();
if (SendPacket(packet, pacing_info)) {
bytes_sent += packet.bytes;
// Send succeeded, remove it from the queue.
packets_->FinalizePop(packet);
packets_.FinalizePop(packet);
if (is_probing && bytes_sent > recommended_probe_size)
break;
} else {
// Send failed, put it back into the queue.
packets_->CancelPop(packet);
packets_.CancelPop(packet);
break;
}
}
if (packets_->Empty() && !Congested()) {
if (packets_.Empty() && !Congested()) {
// We can not send padding unless a normal packet has first been sent. If we
// do, timestamps get messed up.
if (packet_counter_ > 0) {
@ -378,7 +368,7 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread;
}
bool PacedSender::SendPacket(const PacketQueueInterface::Packet& packet,
bool PacedSender::SendPacket(const RoundRobinPacketQueue::Packet& packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(!paused_);
bool audio_packet = packet.priority == kHighPriority;

View File

@ -15,7 +15,7 @@
#include "absl/types/optional.h"
#include "modules/pacing/pacer.h"
#include "modules/pacing/packet_queue_interface.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/thread_annotations.h"
@ -65,11 +65,6 @@ class PacedSender : public Pacer {
PacketSender* packet_sender,
RtcEventLog* event_log);
PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
std::unique_ptr<PacketQueueInterface> packets);
~PacedSender() override;
virtual void CreateProbeCluster(int bitrate_bps);
@ -149,7 +144,7 @@ class PacedSender : public Pacer {
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacket(const PacketQueueInterface::Packet& packet,
bool SendPacket(const RoundRobinPacketQueue::Packet& packet,
const PacedPacketInfo& cluster_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
@ -195,8 +190,7 @@ class PacedSender : public Pacer {
int64_t last_send_time_us_ RTC_GUARDED_BY(critsect_);
int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
const std::unique_ptr<PacketQueueInterface> packets_
RTC_PT_GUARDED_BY(critsect_);
RoundRobinPacketQueue packets_ RTC_GUARDED_BY(critsect_);
uint64_t packet_counter_ RTC_GUARDED_BY(critsect_);
int64_t congestion_window_bytes_ RTC_GUARDED_BY(critsect_) =

View File

@ -1,128 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/packet_queue.h"
#include <algorithm>
#include <list>
#include <vector>
#include "modules/include/module_common_types.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
PacketQueue::PacketQueue(const Clock* clock)
: bytes_(0),
clock_(clock),
queue_time_sum_(0),
time_last_updated_(clock_->TimeInMilliseconds()),
paused_(false) {}
PacketQueue::~PacketQueue() {}
void PacketQueue::Push(const Packet& packet) {
UpdateQueueTime(packet.enqueue_time_ms);
// Store packet in list, use pointers in priority queue for cheaper moves.
// Packets have a handle to its own iterator in the list, for easy removal
// when popping from queue.
packet_list_.push_front(packet);
std::list<Packet>::iterator it = packet_list_.begin();
it->this_it = it; // Handle for direct removal from list.
prio_queue_.push(&(*it)); // Pointer into list.
bytes_ += packet.bytes;
}
const PacketQueueInterface::Packet& PacketQueue::BeginPop() {
const Packet& packet = *prio_queue_.top();
prio_queue_.pop();
return packet;
}
void PacketQueue::CancelPop(const Packet& packet) {
prio_queue_.push(&(*packet.this_it));
}
void PacketQueue::FinalizePop(const Packet& packet) {
bytes_ -= packet.bytes;
int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
packet_queue_time_ms -= packet.sum_paused_ms;
RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
queue_time_sum_ -= packet_queue_time_ms;
packet_list_.erase(packet.this_it);
RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
if (packet_list_.empty())
RTC_DCHECK_EQ(0, queue_time_sum_);
}
bool PacketQueue::Empty() const {
return prio_queue_.empty();
}
size_t PacketQueue::SizeInPackets() const {
return prio_queue_.size();
}
uint64_t PacketQueue::SizeInBytes() const {
return bytes_;
}
int64_t PacketQueue::OldestEnqueueTimeMs() const {
auto it = packet_list_.rbegin();
if (it == packet_list_.rend())
return 0;
return it->enqueue_time_ms;
}
void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
if (timestamp_ms == time_last_updated_)
return;
int64_t delta_ms = timestamp_ms - time_last_updated_;
if (paused_) {
// Increase per-packet accumulators of time spent in queue while paused,
// so that we can disregard that when subtracting main accumulator when
// popping packet from the queue.
for (auto& it : packet_list_) {
it.sum_paused_ms += delta_ms;
}
} else {
// Use packet packet_list_.size() not prio_queue_.size() here, as there
// might be an outstanding element popped from prio_queue_ currently in
// the SendPacket() call, while packet_list_ will always be correct.
queue_time_sum_ += delta_ms * packet_list_.size();
}
time_last_updated_ = timestamp_ms;
}
void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
paused_ = paused;
}
int64_t PacketQueue::AverageQueueTimeMs() const {
if (prio_queue_.empty())
return 0;
return queue_time_sum_ / packet_list_.size();
}
} // namespace webrtc

View File

@ -1,84 +0,0 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACKET_QUEUE_H_
#define MODULES_PACING_PACKET_QUEUE_H_
#include <list>
#include <queue>
#include <set>
#include <vector>
#include "modules/pacing/packet_queue_interface.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueue : public PacketQueueInterface {
public:
explicit PacketQueue(const Clock* clock);
~PacketQueue() override;
using Packet = PacketQueueInterface::Packet;
void Push(const Packet& packet) override;
const Packet& BeginPop() override;
void CancelPop(const Packet& packet) override;
void FinalizePop(const Packet& packet) override;
bool Empty() const override;
size_t SizeInPackets() const override;
uint64_t SizeInBytes() const override;
int64_t OldestEnqueueTimeMs() const override;
void UpdateQueueTime(int64_t timestamp_ms) override;
void SetPauseState(bool paused, int64_t timestamp_ms) override;
int64_t AverageQueueTimeMs() const override;
private:
// Try to add a packet to the set of ssrc/seqno identifiers currently in the
// queue. Return true if inserted, false if this is a duplicate.
bool AddToDupeSet(const Packet& packet);
void RemoveFromDupeSet(const Packet& packet);
// Used by priority queue to sort packets.
struct Comparator {
bool operator()(const Packet* first, const Packet* second) {
// Highest prio = 0.
if (first->priority != second->priority)
return first->priority > second->priority;
// Retransmissions go first.
if (second->retransmission != first->retransmission)
return second->retransmission;
// Older frames have higher prio.
if (first->capture_time_ms != second->capture_time_ms)
return first->capture_time_ms > second->capture_time_ms;
return first->enqueue_order > second->enqueue_order;
}
};
// List of packets, in the order the were enqueued. Since dequeueing may
// occur out of order, use list instead of vector.
std::list<Packet> packet_list_;
// Priority queue of the packets, sorted according to Comparator.
// Use pointers into list, to avodi moving whole struct within heap.
std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
// Total number of bytes in the queue.
uint64_t bytes_;
const Clock* const clock_;
int64_t queue_time_sum_;
int64_t time_last_updated_;
bool paused_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_QUEUE_H_

View File

@ -1,46 +0,0 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/packet_queue_interface.h"
namespace webrtc {
PacketQueueInterface::Packet::Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
PacketQueueInterface::Packet::Packet(const Packet& other) = default;
PacketQueueInterface::Packet::~Packet() {}
bool PacketQueueInterface::Packet::operator<(
const PacketQueueInterface::Packet& other) const {
if (priority != other.priority)
return priority > other.priority;
if (retransmission != other.retransmission)
return other.retransmission;
return enqueue_order > other.enqueue_order;
}
} // namespace webrtc

View File

@ -1,69 +0,0 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACKET_QUEUE_INTERFACE_H_
#define MODULES_PACING_PACKET_QUEUE_INTERFACE_H_
#include <stdint.h>
#include <list>
#include <queue>
#include <set>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueueInterface {
public:
PacketQueueInterface() = default;
virtual ~PacketQueueInterface() = default;
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order);
Packet(const Packet& other);
virtual ~Packet();
bool operator<(const Packet& other) const;
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms;
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
std::multiset<int64_t>::iterator enqueue_time_it;
};
virtual void Push(const Packet& packet) = 0;
virtual const Packet& BeginPop() = 0;
virtual void CancelPop(const Packet& packet) = 0;
virtual void FinalizePop(const Packet& packet) = 0;
virtual bool Empty() const = 0;
virtual size_t SizeInPackets() const = 0;
virtual uint64_t SizeInBytes() const = 0;
virtual int64_t OldestEnqueueTimeMs() const = 0;
virtual void UpdateQueueTime(int64_t timestamp_ms) = 0;
virtual void SetPauseState(bool paused, int64_t timestamp_ms) = 0;
virtual int64_t AverageQueueTimeMs() const = 0;
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_QUEUE_INTERFACE_H_

View File

@ -17,6 +17,38 @@
namespace webrtc {
RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default;
RoundRobinPacketQueue::Packet::~Packet() {}
bool RoundRobinPacketQueue::Packet::operator<(
const RoundRobinPacketQueue::Packet& other) const {
if (priority != other.priority)
return priority > other.priority;
if (retransmission != other.retransmission)
return other.retransmission;
return enqueue_order > other.enqueue_order;
}
RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
RoundRobinPacketQueue::Stream::~Stream() {}
@ -69,7 +101,7 @@ void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) {
size_bytes_ += packet.bytes;
}
const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() {
const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() {
RTC_CHECK(!pop_packet_ && !pop_stream_);
Stream* stream = GetHighestPriorityStream();

View File

@ -11,35 +11,59 @@
#ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#include <list>
#include <map>
#include <queue>
#include <set>
#include "modules/pacing/packet_queue_interface.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class RoundRobinPacketQueue : public PacketQueueInterface {
class RoundRobinPacketQueue {
public:
explicit RoundRobinPacketQueue(const Clock* clock);
~RoundRobinPacketQueue() override;
~RoundRobinPacketQueue();
using Packet = PacketQueueInterface::Packet;
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order);
Packet(const Packet& other);
virtual ~Packet();
bool operator<(const Packet& other) const;
void Push(const Packet& packet) override;
const Packet& BeginPop() override;
void CancelPop(const Packet& packet) override;
void FinalizePop(const Packet& packet) override;
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms;
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
std::multiset<int64_t>::iterator enqueue_time_it;
};
bool Empty() const override;
size_t SizeInPackets() const override;
uint64_t SizeInBytes() const override;
void Push(const Packet& packet);
const Packet& BeginPop();
void CancelPop(const Packet& packet);
void FinalizePop(const Packet& packet);
int64_t OldestEnqueueTimeMs() const override;
int64_t AverageQueueTimeMs() const override;
void UpdateQueueTime(int64_t timestamp_ms) override;
void SetPauseState(bool paused, int64_t timestamp_ms) override;
bool Empty() const;
size_t SizeInPackets() const;
uint64_t SizeInBytes() const;
int64_t OldestEnqueueTimeMs() const;
int64_t AverageQueueTimeMs() const;
void UpdateQueueTime(int64_t timestamp_ms);
void SetPauseState(bool paused, int64_t timestamp_ms);
private:
struct StreamPrioKey {