Move PacketQueue out of paced_sender.cc to its own packet_queue.{cc,h}.

Bug: webrtc:8287, webrtc:8288
Change-Id: If8937458c5b8f5a75b3de441aa409ae873f4bda2
Reviewed-on: https://webrtc-review.googlesource.com/3761
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Philip Eliasson <philipel@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#20003}
This commit is contained in:
philipel
2017-09-26 17:16:06 +02:00
committed by Commit Bot
parent 02e7a1981a
commit 9981bd928f
5 changed files with 294 additions and 210 deletions

View File

@ -19,6 +19,8 @@ rtc_static_library("pacing") {
"paced_sender.cc",
"paced_sender.h",
"pacer.h",
"packet_queue.cc",
"packet_queue.h",
"packet_router.cc",
"packet_router.h",
]

View File

@ -37,201 +37,7 @@ const int64_t kMaxIntervalTimeMs = 30;
} // namespace
// TODO(sprang): Move at least PacketQueue out to separate files, so that we can
// more easily test them.
namespace webrtc {
namespace paced_sender {
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused.
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
};
// Used by priority queue to sort packets.
struct Comparator {
bool operator()(const Packet* first, const Packet* second) {
// Highest prio = 0.
if (first->priority != second->priority)
return first->priority > second->priority;
// Retransmissions go first.
if (second->retransmission != first->retransmission)
return second->retransmission;
// Older frames have higher prio.
if (first->capture_time_ms != second->capture_time_ms)
return first->capture_time_ms > second->capture_time_ms;
return first->enqueue_order > second->enqueue_order;
}
};
// Class encapsulating a priority queue with some extensions.
class PacketQueue {
public:
explicit PacketQueue(const Clock* clock)
: bytes_(0),
clock_(clock),
queue_time_sum_(0),
time_last_updated_(clock_->TimeInMilliseconds()),
paused_(false) {}
virtual ~PacketQueue() {}
void Push(const Packet& packet) {
if (!AddToDupeSet(packet))
return;
UpdateQueueTime(packet.enqueue_time_ms);
// Store packet in list, use pointers in priority queue for cheaper moves.
// Packets have a handle to its own iterator in the list, for easy removal
// when popping from queue.
packet_list_.push_front(packet);
std::list<Packet>::iterator it = packet_list_.begin();
it->this_it = it; // Handle for direct removal from list.
prio_queue_.push(&(*it)); // Pointer into list.
bytes_ += packet.bytes;
}
const Packet& BeginPop() {
const Packet& packet = *prio_queue_.top();
prio_queue_.pop();
return packet;
}
void CancelPop(const Packet& packet) { prio_queue_.push(&(*packet.this_it)); }
void FinalizePop(const Packet& packet) {
RemoveFromDupeSet(packet);
bytes_ -= packet.bytes;
int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
packet_queue_time_ms -= packet.sum_paused_ms;
RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
queue_time_sum_ -= packet_queue_time_ms;
packet_list_.erase(packet.this_it);
RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
if (packet_list_.empty())
RTC_DCHECK_EQ(0, queue_time_sum_);
}
bool Empty() const { return prio_queue_.empty(); }
size_t SizeInPackets() const { return prio_queue_.size(); }
uint64_t SizeInBytes() const { return bytes_; }
int64_t OldestEnqueueTimeMs() const {
auto it = packet_list_.rbegin();
if (it == packet_list_.rend())
return 0;
return it->enqueue_time_ms;
}
void UpdateQueueTime(int64_t timestamp_ms) {
RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
if (timestamp_ms == time_last_updated_)
return;
int64_t delta_ms = timestamp_ms - time_last_updated_;
if (paused_) {
// Increase per-packet accumulators of time spent in queue while paused,
// so that we can disregard that when subtracting main accumulator when
// popping packet from the queue.
for (auto& it : packet_list_) {
it.sum_paused_ms += delta_ms;
}
} else {
// Use packet packet_list_.size() not prio_queue_.size() here, as there
// might be an outstanding element popped from prio_queue_ currently in
// the SendPacket() call, while packet_list_ will always be correct.
queue_time_sum_ += delta_ms * packet_list_.size();
}
time_last_updated_ = timestamp_ms;
}
void SetPauseState(bool paused, int64_t timestamp_ms) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
paused_ = paused;
}
int64_t AverageQueueTimeMs() const {
if (prio_queue_.empty())
return 0;
return queue_time_sum_ / packet_list_.size();
}
private:
// Try to add a packet to the set of ssrc/seqno identifiers currently in the
// queue. Return true if inserted, false if this is a duplicate.
bool AddToDupeSet(const Packet& packet) {
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
if (it == dupe_map_.end()) {
// First for this ssrc, just insert.
dupe_map_[packet.ssrc].insert(packet.sequence_number);
return true;
}
// Insert returns a pair, where second is a bool set to true if new element.
return it->second.insert(packet.sequence_number).second;
}
void RemoveFromDupeSet(const Packet& packet) {
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
RTC_DCHECK(it != dupe_map_.end());
it->second.erase(packet.sequence_number);
if (it->second.empty()) {
dupe_map_.erase(it);
}
}
// List of packets, in the order the were enqueued. Since dequeueing may
// occur out of order, use list instead of vector.
std::list<Packet> packet_list_;
// Priority queue of the packets, sorted according to Comparator.
// Use pointers into list, to avoid moving whole struct within heap.
std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
// Total number of bytes in the queue.
uint64_t bytes_;
// Map<ssrc, std::set<seq_no> >, for checking duplicates.
typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
SsrcSeqNoMap dupe_map_;
const Clock* const clock_;
int64_t queue_time_sum_;
int64_t time_last_updated_;
bool paused_;
};
} // namespace paced_sender
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@ -253,7 +59,7 @@ PacedSender::PacedSender(const Clock* clock,
pacing_bitrate_kbps_(0),
time_last_update_us_(clock->TimeInMicroseconds()),
first_sent_packet_ms_(-1),
packets_(new paced_sender::PacketQueue(clock)),
packets_(new PacketQueue(clock)),
packet_counter_(0),
pacing_factor_(kDefaultPaceMultiplier),
queue_time_limit(kMaxQueueLengthMs) {
@ -342,9 +148,9 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
if (capture_time_ms < 0)
capture_time_ms = now_ms;
packets_->Push(paced_sender::Packet(priority, ssrc, sequence_number,
capture_time_ms, now_ms, bytes,
retransmission, packet_counter_++));
packets_->Push(PacketQueue::Packet(priority, ssrc, sequence_number,
capture_time_ms, now_ms, bytes,
retransmission, packet_counter_++));
}
int64_t PacedSender::ExpectedQueueTimeMs() const {
@ -455,7 +261,7 @@ void PacedSender::Process() {
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
const paced_sender::Packet& packet = packets_->BeginPop();
const PacketQueue::Packet& packet = packets_->BeginPop();
if (SendPacket(packet, pacing_info)) {
// Send succeeded, remove it from the queue.
@ -496,7 +302,7 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread;
}
bool PacedSender::SendPacket(const paced_sender::Packet& packet,
bool PacedSender::SendPacket(const PacketQueue::Packet& packet,
const PacedPacketInfo& pacing_info) {
RTC_DCHECK(!paused_);
if (media_budget_->bytes_remaining() == 0 &&

View File

@ -11,12 +11,11 @@
#ifndef MODULES_PACING_PACED_SENDER_H_
#define MODULES_PACING_PACED_SENDER_H_
#include <list>
#include <memory>
#include <set>
#include "api/optional.h"
#include "modules/pacing/pacer.h"
#include "modules/pacing/packet_queue.h"
#include "rtc_base/criticalsection.h"
#include "rtc_base/thread_annotations.h"
#include "typedefs.h" // NOLINT(build/include)
@ -29,12 +28,6 @@ class ProbeClusterCreatedObserver;
class RtcEventLog;
class IntervalBudget;
namespace paced_sender {
class IntervalBudget;
struct Packet;
class PacketQueue;
} // namespace paced_sender
class PacedSender : public Pacer {
public:
class PacketSender {
@ -159,7 +152,7 @@ class PacedSender : public Pacer {
void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
bool SendPacket(const paced_sender::Packet& packet,
bool SendPacket(const PacketQueue::Packet& packet,
const PacedPacketInfo& cluster_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
size_t SendPadding(size_t padding_needed, const PacedPacketInfo& cluster_info)
@ -191,7 +184,7 @@ class PacedSender : public Pacer {
int64_t time_last_update_us_ RTC_GUARDED_BY(critsect_);
int64_t first_sent_packet_ms_ RTC_GUARDED_BY(critsect_);
std::unique_ptr<paced_sender::PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
std::unique_ptr<PacketQueue> packets_ RTC_GUARDED_BY(critsect_);
uint64_t packet_counter_;
ProcessThread* process_thread_ = nullptr;

View File

@ -0,0 +1,174 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#include "modules/pacing/packet_queue.h"
#include <algorithm>
#include <list>
#include <vector>
#include "modules/include/module_common_types.h"
#include "modules/pacing/alr_detector.h"
#include "modules/pacing/bitrate_prober.h"
#include "modules/pacing/interval_budget.h"
#include "modules/utility/include/process_thread.h"
#include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "system_wrappers/include/clock.h"
#include "system_wrappers/include/field_trial.h"
namespace webrtc {
PacketQueue::Packet::Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order)
: priority(priority),
ssrc(ssrc),
sequence_number(seq_number),
capture_time_ms(capture_time_ms),
enqueue_time_ms(enqueue_time_ms),
sum_paused_ms(0),
bytes(length_in_bytes),
retransmission(retransmission),
enqueue_order(enqueue_order) {}
PacketQueue::Packet::~Packet() {}
PacketQueue::PacketQueue(const Clock* clock)
: bytes_(0),
clock_(clock),
queue_time_sum_(0),
time_last_updated_(clock_->TimeInMilliseconds()),
paused_(false) {}
PacketQueue::~PacketQueue() {}
void PacketQueue::Push(const Packet& packet) {
if (!AddToDupeSet(packet))
return;
UpdateQueueTime(packet.enqueue_time_ms);
// Store packet in list, use pointers in priority queue for cheaper moves.
// Packets have a handle to its own iterator in the list, for easy removal
// when popping from queue.
packet_list_.push_front(packet);
std::list<Packet>::iterator it = packet_list_.begin();
it->this_it = it; // Handle for direct removal from list.
prio_queue_.push(&(*it)); // Pointer into list.
bytes_ += packet.bytes;
}
const PacketQueue::Packet& PacketQueue::BeginPop() {
const PacketQueue::Packet& packet = *prio_queue_.top();
prio_queue_.pop();
return packet;
}
void PacketQueue::CancelPop(const PacketQueue::Packet& packet) {
prio_queue_.push(&(*packet.this_it));
}
void PacketQueue::FinalizePop(const PacketQueue::Packet& packet) {
RemoveFromDupeSet(packet);
bytes_ -= packet.bytes;
int64_t packet_queue_time_ms = time_last_updated_ - packet.enqueue_time_ms;
RTC_DCHECK_LE(packet.sum_paused_ms, packet_queue_time_ms);
packet_queue_time_ms -= packet.sum_paused_ms;
RTC_DCHECK_LE(packet_queue_time_ms, queue_time_sum_);
queue_time_sum_ -= packet_queue_time_ms;
packet_list_.erase(packet.this_it);
RTC_DCHECK_EQ(packet_list_.size(), prio_queue_.size());
if (packet_list_.empty())
RTC_DCHECK_EQ(0, queue_time_sum_);
}
bool PacketQueue::Empty() const {
return prio_queue_.empty();
}
size_t PacketQueue::SizeInPackets() const {
return prio_queue_.size();
}
uint64_t PacketQueue::SizeInBytes() const {
return bytes_;
}
int64_t PacketQueue::OldestEnqueueTimeMs() const {
auto it = packet_list_.rbegin();
if (it == packet_list_.rend())
return 0;
return it->enqueue_time_ms;
}
void PacketQueue::UpdateQueueTime(int64_t timestamp_ms) {
RTC_DCHECK_GE(timestamp_ms, time_last_updated_);
if (timestamp_ms == time_last_updated_)
return;
int64_t delta_ms = timestamp_ms - time_last_updated_;
if (paused_) {
// Increase per-packet accumulators of time spent in queue while paused,
// so that we can disregard that when subtracting main accumulator when
// popping packet from the queue.
for (auto& it : packet_list_) {
it.sum_paused_ms += delta_ms;
}
} else {
// Use packet packet_list_.size() not prio_queue_.size() here, as there
// might be an outstanding element popped from prio_queue_ currently in
// the SendPacket() call, while packet_list_ will always be correct.
queue_time_sum_ += delta_ms * packet_list_.size();
}
time_last_updated_ = timestamp_ms;
}
void PacketQueue::SetPauseState(bool paused, int64_t timestamp_ms) {
if (paused_ == paused)
return;
UpdateQueueTime(timestamp_ms);
paused_ = paused;
}
int64_t PacketQueue::AverageQueueTimeMs() const {
if (prio_queue_.empty())
return 0;
return queue_time_sum_ / packet_list_.size();
}
bool PacketQueue::AddToDupeSet(const PacketQueue::Packet& packet) {
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
if (it == dupe_map_.end()) {
// First for this ssrc, just insert.
dupe_map_[packet.ssrc].insert(packet.sequence_number);
return true;
}
// Insert returns a pair, where second is a bool set to true if new element.
return it->second.insert(packet.sequence_number).second;
}
void PacketQueue::RemoveFromDupeSet(const PacketQueue::Packet& packet) {
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
RTC_DCHECK(it != dupe_map_.end());
it->second.erase(packet.sequence_number);
if (it->second.empty()) {
dupe_map_.erase(it);
}
}
} // namespace webrtc

View File

@ -0,0 +1,109 @@
/*
* Copyright (c) 2017 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_PACING_PACKET_QUEUE_H_
#define MODULES_PACING_PACKET_QUEUE_H_
#include <list>
#include <map>
#include <queue>
#include <set>
#include <vector>
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
namespace webrtc {
class PacketQueue {
public:
explicit PacketQueue(const Clock* clock);
virtual ~PacketQueue();
struct Packet {
Packet(RtpPacketSender::Priority priority,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order);
virtual ~Packet();
RtpPacketSender::Priority priority;
uint32_t ssrc;
uint16_t sequence_number;
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms; // Sum of time spent in queue while pacer is paused.
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
};
void Push(const Packet& packet);
const Packet& BeginPop();
void CancelPop(const Packet& packet);
void FinalizePop(const Packet& packet);
bool Empty() const;
size_t SizeInPackets() const;
uint64_t SizeInBytes() const;
int64_t OldestEnqueueTimeMs() const;
void UpdateQueueTime(int64_t timestamp_ms);
void SetPauseState(bool paused, int64_t timestamp_ms);
int64_t AverageQueueTimeMs() const;
private:
// Try to add a packet to the set of ssrc/seqno identifiers currently in the
// queue. Return true if inserted, false if this is a duplicate.
bool AddToDupeSet(const Packet& packet);
void RemoveFromDupeSet(const Packet& packet);
// Used by priority queue to sort packets.
struct Comparator {
bool operator()(const Packet* first, const Packet* second) {
// Highest prio = 0.
if (first->priority != second->priority)
return first->priority > second->priority;
// Retransmissions go first.
if (second->retransmission != first->retransmission)
return second->retransmission;
// Older frames have higher prio.
if (first->capture_time_ms != second->capture_time_ms)
return first->capture_time_ms > second->capture_time_ms;
return first->enqueue_order > second->enqueue_order;
}
};
// List of packets, in the order the were enqueued. Since dequeueing may
// occur out of order, use list instead of vector.
std::list<Packet> packet_list_;
// Priority queue of the packets, sorted according to Comparator.
// Use pointers into list, to avodi moving whole struct within heap.
std::priority_queue<Packet*, std::vector<Packet*>, Comparator> prio_queue_;
// Total number of bytes in the queue.
uint64_t bytes_;
// Map<ssrc, std::set<seq_no> >, for checking duplicates.
typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
SsrcSeqNoMap dupe_map_;
const Clock* const clock_;
int64_t queue_time_sum_;
int64_t time_last_updated_;
bool paused_;
};
} // namespace webrtc
#endif // MODULES_PACING_PACKET_QUEUE_H_