Renamed PacketQueue2 to RoundRobinPacketQueue.

The previous name packet queue 2 had no indication on what the
difference was compared to the regular packet queue. This rename makes
it easier to understand the codebase.

Additionally the PacketQueueInterface class was introduced to make the
class hierarchy easier to follow. The round robin packet queue did not
extend the packet queue so there was no reason for inheriting from the
specific implementation.

Bug: None
Change-Id: Idbce081c751fbacd927632f5e71220887d0b5991
Reviewed-on: https://webrtc-review.googlesource.com/49120
Commit-Queue: Sebastian Jansson <srte@webrtc.org>
Reviewed-by: Stefan Holmer <stefan@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#21931}
This commit is contained in:
Sebastian Jansson
2018-02-07 13:26:38 +01:00
committed by Commit Bot
parent 17cdcbb57b
commit b537496520
9 changed files with 191 additions and 116 deletions

View File

@ -19,10 +19,12 @@ rtc_static_library("pacing") {
"pacer.h",
"packet_queue.cc",
"packet_queue.h",
"packet_queue2.cc",
"packet_queue2.h",
"packet_queue_interface.cc",
"packet_queue_interface.h",
"packet_router.cc",
"packet_router.h",
"round_robin_packet_queue.cc",
"round_robin_packet_queue.h",
]
if (!build_with_chromium && is_clang) {

View File

@ -20,6 +20,8 @@
#include "modules/include/module_common_types.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/pacing/packet_queue.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
@ -52,18 +54,29 @@ namespace webrtc {
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
namespace {
std::unique_ptr<PacketQueueInterface> CreatePacketQueue(const Clock* clock,
bool round_robin) {
if (round_robin) {
return rtc::MakeUnique<RoundRobinPacketQueue>(clock);
} else {
return rtc::MakeUnique<PacketQueue>(clock);
}
}
} // namespace
PacedSender::PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log) :
PacedSender(clock, packet_sender, event_log,
IsRoundRobinPacingEnabled()
? rtc::MakeUnique<PacketQueue2>(clock)
: rtc::MakeUnique<PacketQueue>(clock)) {}
RtcEventLog* event_log)
: PacedSender(clock,
packet_sender,
event_log,
CreatePacketQueue(clock, IsRoundRobinPacingEnabled())) {}
PacedSender::PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
std::unique_ptr<PacketQueue> packets)
std::unique_ptr<PacketQueueInterface> packets)
: clock_(clock),
packet_sender_(packet_sender),
paused_(false),

View File

@ -15,7 +15,7 @@
#include "api/optional.h"
#include "modules/pacing/pacer.h"
#include "modules/pacing/packet_queue2.h"
#include "modules/pacing/packet_queue_interface.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/thread_annotations.h"
#include "typedefs.h" // NOLINT(build/include)
@ -62,7 +62,7 @@ class PacedSender : public Pacer {
PacedSender(const Clock* clock,
PacketSender* packet_sender,
RtcEventLog* event_log,
std::unique_ptr<PacketQueue> packets);
std::unique_ptr<PacketQueueInterface> packets);
~PacedSender() override;
@ -129,7 +129,7 @@ class PacedSender : public Pacer {
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacket(const PacketQueue::Packet& packet,
bool SendPacket(const PacketQueueInterface::Packet& packet,
const PacedPacketInfo& cluster_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
@ -159,7 +159,8 @@ class PacedSender : public Pacer {
int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_);
int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
const std::unique_ptr<PacketQueue> packets_ RTC_PT_GUARDED_BY(critsect_);
const std::unique_ptr<PacketQueueInterface> packets_
RTC_PT_GUARDED_BY(critsect_);
uint64_t packet_counter_ RTC_GUARDED_BY(critsect_);
// Lock to avoid race when attaching process thread. This can happen due to

View File

@ -25,28 +25,6 @@
namespace webrtc {
PacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
PacketQueue::Packet::Packet(const Packet& other) = default;
PacketQueue::Packet::~Packet() {}
PacketQueue::PacketQueue(const Clock* clock)
: bytes_(0),
clock_(clock),
@ -69,17 +47,17 @@ void PacketQueue::Push(const Packet& packet) {
bytes_ += packet.bytes;
}
const PacketQueue::Packet& PacketQueue::BeginPop() {
const PacketQueue::Packet& packet = *prio_queue_.top();
const PacketQueueInterface::Packet& PacketQueue::BeginPop() {
const Packet& packet = *prio_queue_.top();
prio_queue_.pop();
return packet;
}
void PacketQueue::CancelPop(const PacketQueue::Packet& packet) {
void PacketQueue::CancelPop(const Packet& packet) {
prio_queue_.push(&(*packet.this_it));
}
void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) {
void PacketQueue::FinalizePop(const Packet& packet) {
bytes_ -= packet.bytes;
int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);

View File

@ -16,62 +16,29 @@
#include <set>
#include <vector>
#include "modules/pacing/packet_queue_interface.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueue {
class PacketQueue : public PacketQueueInterface {
public:
explicit PacketQueue(const Clock* clock);
virtual ~PacketQueue();
~PacketQueue() override;
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order);
using Packet = PacketQueueInterface::Packet;
Packet(const Packet& other);
virtual ~Packet();
bool operator<(const Packet& other) const {
if (priority != other.priority)
return priority > other.priority;
if (retransmission != other.retransmission)
return other.retransmission;
return enqueue_order > other.enqueue_order;
}
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms;
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
std::multiset<int64_t>::iterator enqueue_time_it;
};
virtual void Push(const Packet& packet);
virtual const Packet& BeginPop();
virtual void CancelPop(const Packet& packet);
virtual void FinalizePop(const Packet& packet);
virtual bool Empty() const;
virtual size_t SizeInPackets() const;
virtual uint64_t SizeInBytes() const;
virtual int64_t OldestEnqueueTimeMs() const;
virtual void UpdateQueueTime(int64_t timestamp_ms);
virtual void SetPauseState(bool paused, int64_t timestamp_ms);
virtual int64_t AverageQueueTimeMs() const;
void Push(const Packet& packet) override;
const Packet& BeginPop() override;
void CancelPop(const Packet& packet) override;
void FinalizePop(const Packet& packet) override;
bool Empty() const override;
size_t SizeInPackets() const override;
uint64_t SizeInBytes() const override;
int64_t OldestEnqueueTimeMs() const override;
void UpdateQueueTime(int64_t timestamp_ms) override;
void SetPauseState(bool paused, int64_t timestamp_ms) override;
int64_t AverageQueueTimeMs() const override;
private:
// Try to add a packet to the set of ssrc/seqno identifiers currently in the

View File

@ -0,0 +1,46 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/packet_queue_interface.h"
namespace webrtc {
PacketQueueInterface::Packet::Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
PacketQueueInterface::Packet::Packet(const Packet& other) = default;
PacketQueueInterface::Packet::~Packet() {}
bool PacketQueueInterface::Packet::operator<(
const PacketQueueInterface::Packet& other) const {
if (priority != other.priority)
return priority > other.priority;
if (retransmission != other.retransmission)
return other.retransmission;
return enqueue_order > other.enqueue_order;
}
} // namespace webrtc

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2018 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACKET_QUEUE_INTERFACE_H_
#define MODULES_PACING_PACKET_QUEUE_INTERFACE_H_
#include <stdint.h>
#include <list>
#include <queue>
#include <set>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueueInterface {
public:
PacketQueueInterface() = default;
virtual ~PacketQueueInterface() = default;
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order);
Packet(const Packet& other);
virtual ~Packet();
bool operator<(const Packet& other) const;
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms;
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
std::multiset<int64_t>::iterator enqueue_time_it;
};
virtual void Push(const Packet& packet) = 0;
virtual const Packet& BeginPop() = 0;
virtual void CancelPop(const Packet& packet) = 0;
virtual void FinalizePop(const Packet& packet) = 0;
virtual bool Empty() const = 0;
virtual size_t SizeInPackets() const = 0;
virtual uint64_t SizeInBytes() const = 0;
virtual int64_t OldestEnqueueTimeMs() const = 0;
virtual void UpdateQueueTime(int64_t timestamp_ms) = 0;
virtual void SetPauseState(bool paused, int64_t timestamp_ms) = 0;
virtual int64_t AverageQueueTimeMs() const = 0;
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_QUEUE_INTERFACE_H_

View File

@ -8,7 +8,7 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/packet_queue2.h"
#include "modules/pacing/round_robin_packet_queue.h"
#include <algorithm>
@ -17,17 +17,15 @@
namespace webrtc {
PacketQueue2::Stream::Stream() : bytes(0) {}
PacketQueue2::Stream::~Stream() {}
RoundRobinPacketQueue::Stream::Stream() : bytes(0) {}
RoundRobinPacketQueue::Stream::~Stream() {}
PacketQueue2::PacketQueue2(const Clock* clock)
: PacketQueue(clock),
clock_(clock),
time_last_updated_(clock_->TimeInMilliseconds()) {}
RoundRobinPacketQueue::RoundRobinPacketQueue(const Clock* clock)
: clock_(clock), time_last_updated_(clock_->TimeInMilliseconds()) {}
PacketQueue2::~PacketQueue2() {}
RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
void PacketQueue2::Push(const Packet& packet_to_insert) {
void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) {
Packet packet(packet_to_insert);
auto stream_info_it = streams_.find(packet.ssrc);
@ -70,7 +68,7 @@ void PacketQueue2::Push(const Packet& packet_to_insert) {
size_bytes_ += packet.bytes;
}
const PacketQueue2::Packet& PacketQueue2::BeginPop() {
const PacketQueueInterface::Packet& RoundRobinPacketQueue::BeginPop() {
RTC_CHECK(!pop_packet_ && !pop_stream_);
Stream* stream = GetHighestPriorityStream();
@ -81,14 +79,14 @@ const PacketQueue2::Packet& PacketQueue2::BeginPop() {
return *pop_packet_;
}
void PacketQueue2::CancelPop(const Packet& packet) {
void RoundRobinPacketQueue::CancelPop(const Packet& packet) {
RTC_CHECK(pop_packet_ && pop_stream_);
(*pop_stream_)->packet_queue.push(*pop_packet_);
pop_packet_.reset();
pop_stream_.reset();
}
void PacketQueue2::FinalizePop(const Packet& packet) {
void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
RTC_CHECK(!paused_);
if (!Empty()) {
RTC_CHECK(pop_packet_ && pop_stream_);
@ -137,28 +135,28 @@ void PacketQueue2::FinalizePop(const Packet& packet) {
}
}
bool PacketQueue2::Empty() const {
bool RoundRobinPacketQueue::Empty() const {
RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
(stream_priorities_.empty() && size_packets_ == 0));
return stream_priorities_.empty();
}
size_t PacketQueue2::SizeInPackets() const {
size_t RoundRobinPacketQueue::SizeInPackets() const {
return size_packets_;
}
uint64_t PacketQueue2::SizeInBytes() const {
uint64_t RoundRobinPacketQueue::SizeInBytes() const {
return size_bytes_;
}
int64_t PacketQueue2::OldestEnqueueTimeMs() const {
int64_t RoundRobinPacketQueue::OldestEnqueueTimeMs() const {
if (Empty())
return 0;
RTC_CHECK(!enqueue_times_.empty());
return *enqueue_times_.begin();
}
void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) {
void RoundRobinPacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
RTC_CHECK_GE(timestamp_ms, time_last_updated_);
if (timestamp_ms == time_last_updated_)
return;
@ -174,20 +172,21 @@ void PacketQueue2::UpdateQueueTime(int64_t timestamp_ms) {
time_last_updated_ = timestamp_ms;
}
void PacketQueue2::SetPauseState(bool paused, int64_t timestamp_ms) {
void RoundRobinPacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
paused_ = paused;
}
int64_t PacketQueue2::AverageQueueTimeMs() const {
int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const {
if (Empty())
return 0;
return queue_time_sum_ms_ / size_packets_;
}
PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() {
RoundRobinPacketQueue::Stream*
RoundRobinPacketQueue::GetHighestPriorityStream() {
RTC_CHECK(!stream_priorities_.empty());
uint32_t ssrc = stream_priorities_.begin()->second;
@ -198,7 +197,7 @@ PacketQueue2::Stream* PacketQueue2::GetHighestPriorityStream() {
return &stream_info_it->second;
}
bool PacketQueue2::IsSsrcScheduled(uint32_t ssrc) const {
bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
for (const auto& scheduled_stream : stream_priorities_) {
if (scheduled_stream.second == ssrc)
return true;

View File

@ -8,24 +8,24 @@
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACKET_QUEUE2_H_
#define MODULES_PACING_PACKET_QUEUE2_H_
#ifndef MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#define MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_
#include <map>
#include <queue>
#include <set>
#include "modules/pacing/packet_queue.h"
#include "modules/pacing/packet_queue_interface.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueue2 : public PacketQueue {
class RoundRobinPacketQueue : public PacketQueueInterface {
public:
explicit PacketQueue2(const Clock* clock);
~PacketQueue2() override;
explicit RoundRobinPacketQueue(const Clock* clock);
~RoundRobinPacketQueue() override;
using Packet = PacketQueue::Packet;
using Packet = PacketQueueInterface::Packet;
void Push(const Packet& packet) override;
const Packet& BeginPop() override;
@ -41,6 +41,7 @@ class PacketQueue2 : public PacketQueue {
void UpdateQueueTime(int64_t timestamp_ms) override;
void SetPauseState(bool paused, int64_t timestamp_ms) override;
private:
struct StreamPrioKey {
StreamPrioKey() = default;
StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes)
@ -73,7 +74,6 @@ class PacketQueue2 : public PacketQueue {
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
};
private:
static constexpr size_t kMaxLeadingBytes = 1400;
Stream* GetHighestPriorityStream();
@ -108,4 +108,4 @@ class PacketQueue2 : public PacketQueue {
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_QUEUE2_H_
#endif // MODULES_PACING_ROUND_ROBIN_PACKET_QUEUE_H_