Adds fast-path optimization for single packet in pacer queue.
Especially with the TaskQueuePacerSender, it is very common that a single packet is added to the queue and then immediately removed as it gets sent to the network. This CL adds a fast-path for that case, that avoid creating book- keeping in the form of stream-priorities and timestamp sets etc. Functionally, it should be a noop, but hopefully it can save a few CPU cycles. Bug: webrtc:10809 Change-Id: Idaa06b4f8d1da444fce78cc742e2ab52f9efe815 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/172090 Commit-Queue: Erik Språng <sprang@webrtc.org> Reviewed-by: Ilya Nikolaevskiy <ilnik@webrtc.org> Cr-Commit-Position: refs/heads/master@{#31001}
This commit is contained in:
@ -78,6 +78,11 @@ RtpPacketToSend* RoundRobinPacketQueue::QueuedPacket::RtpPacket() const {
|
||||
return owned_packet_;
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::QueuedPacket::UpdateEnqueueTimeIterator(
|
||||
std::multiset<Timestamp>::iterator it) {
|
||||
enqueue_time_it_ = it;
|
||||
}
|
||||
|
||||
std::multiset<Timestamp>::iterator
|
||||
RoundRobinPacketQueue::QueuedPacket::EnqueueTimeIterator() const {
|
||||
return enqueue_time_it_;
|
||||
@ -134,11 +139,34 @@ void RoundRobinPacketQueue::Push(int priority,
|
||||
uint64_t enqueue_order,
|
||||
std::unique_ptr<RtpPacketToSend> packet) {
|
||||
RTC_DCHECK(packet->packet_type().has_value());
|
||||
Push(QueuedPacket(priority, enqueue_time, enqueue_order,
|
||||
enqueue_times_.insert(enqueue_time), std::move(packet)));
|
||||
if (size_packets_ == 0) {
|
||||
// Single packet fast-path.
|
||||
single_packet_queue_.emplace(
|
||||
QueuedPacket(priority, enqueue_time, enqueue_order,
|
||||
enqueue_times_.end(), std::move(packet)));
|
||||
UpdateQueueTime(enqueue_time);
|
||||
single_packet_queue_->SubtractPauseTime(pause_time_sum_);
|
||||
size_packets_ = 1;
|
||||
size_ += PacketSize(*single_packet_queue_);
|
||||
} else {
|
||||
MaybePromoteSinglePacketToNormalQueue();
|
||||
Push(QueuedPacket(priority, enqueue_time, enqueue_order,
|
||||
enqueue_times_.insert(enqueue_time), std::move(packet)));
|
||||
}
|
||||
}
|
||||
|
||||
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
||||
if (single_packet_queue_.has_value()) {
|
||||
RTC_DCHECK(stream_priorities_.empty());
|
||||
std::unique_ptr<RtpPacketToSend> rtp_packet(
|
||||
single_packet_queue_->RtpPacket());
|
||||
single_packet_queue_.reset();
|
||||
queue_time_sum_ = TimeDelta::Zero();
|
||||
size_packets_ = 0;
|
||||
size_ = DataSize::Zero();
|
||||
return rtp_packet;
|
||||
}
|
||||
|
||||
RTC_DCHECK(!Empty());
|
||||
Stream* stream = GetHighestPriorityStream();
|
||||
const QueuedPacket& queued_packet = stream->packet_queue.top();
|
||||
@ -163,13 +191,7 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
||||
// case a "budget" will be built up for the stream sending at the lower
|
||||
// rate. To avoid building a too large budget we limit |bytes| to be within
|
||||
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
|
||||
DataSize packet_size =
|
||||
DataSize::Bytes(queued_packet.RtpPacket()->payload_size() +
|
||||
queued_packet.RtpPacket()->padding_size());
|
||||
if (include_overhead_) {
|
||||
packet_size += DataSize::Bytes(queued_packet.RtpPacket()->headers_size()) +
|
||||
transport_overhead_per_packet_;
|
||||
}
|
||||
DataSize packet_size = PacketSize(queued_packet);
|
||||
stream->size =
|
||||
std::max(stream->size + packet_size, max_size_ - kMaxLeadingSize);
|
||||
max_size_ = std::max(max_size_, stream->size);
|
||||
@ -195,9 +217,12 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
||||
}
|
||||
|
||||
bool RoundRobinPacketQueue::Empty() const {
|
||||
RTC_CHECK((!stream_priorities_.empty() && size_packets_ > 0) ||
|
||||
(stream_priorities_.empty() && size_packets_ == 0));
|
||||
return stream_priorities_.empty();
|
||||
if (size_packets_ == 0) {
|
||||
RTC_DCHECK(!single_packet_queue_.has_value() && stream_priorities_.empty());
|
||||
return true;
|
||||
}
|
||||
RTC_DCHECK(single_packet_queue_.has_value() || !stream_priorities_.empty());
|
||||
return false;
|
||||
}
|
||||
|
||||
size_t RoundRobinPacketQueue::SizeInPackets() const {
|
||||
@ -209,6 +234,10 @@ DataSize RoundRobinPacketQueue::Size() const {
|
||||
}
|
||||
|
||||
bool RoundRobinPacketQueue::NextPacketIsAudio() const {
|
||||
if (single_packet_queue_.has_value()) {
|
||||
return single_packet_queue_->Type() == RtpPacketMediaType::kAudio;
|
||||
}
|
||||
|
||||
if (stream_priorities_.empty()) {
|
||||
return false;
|
||||
}
|
||||
@ -220,6 +249,10 @@ bool RoundRobinPacketQueue::NextPacketIsAudio() const {
|
||||
}
|
||||
|
||||
Timestamp RoundRobinPacketQueue::OldestEnqueueTime() const {
|
||||
if (single_packet_queue_.has_value()) {
|
||||
return single_packet_queue_->EnqueueTime();
|
||||
}
|
||||
|
||||
if (Empty())
|
||||
return Timestamp::MinusInfinity();
|
||||
RTC_CHECK(!enqueue_times_.empty());
|
||||
@ -250,6 +283,7 @@ void RoundRobinPacketQueue::SetPauseState(bool paused, Timestamp now) {
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::SetIncludeOverhead() {
|
||||
MaybePromoteSinglePacketToNormalQueue();
|
||||
include_overhead_ = true;
|
||||
// We need to update the size to reflect overhead for existing packets.
|
||||
for (const auto& stream : streams_) {
|
||||
@ -261,6 +295,7 @@ void RoundRobinPacketQueue::SetIncludeOverhead() {
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::SetTransportOverhead(DataSize overhead_per_packet) {
|
||||
MaybePromoteSinglePacketToNormalQueue();
|
||||
if (include_overhead_) {
|
||||
DataSize previous_overhead = transport_overhead_per_packet_;
|
||||
// We need to update the size to reflect overhead for existing packets.
|
||||
@ -304,26 +339,44 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) {
|
||||
}
|
||||
RTC_CHECK(stream->priority_it != stream_priorities_.end());
|
||||
|
||||
// In order to figure out how much time a packet has spent in the queue while
|
||||
// not in a paused state, we subtract the total amount of time the queue has
|
||||
// been paused so far, and when the packet is popped we subtract the total
|
||||
// amount of time the queue has been paused at that moment. This way we
|
||||
// subtract the total amount of time the packet has spent in the queue while
|
||||
// in a paused state.
|
||||
UpdateQueueTime(packet.EnqueueTime());
|
||||
packet.SubtractPauseTime(pause_time_sum_);
|
||||
if (packet.EnqueueTimeIterator() == enqueue_times_.end()) {
|
||||
// Promotion from single-packet queue. Just add to enqueue times.
|
||||
packet.UpdateEnqueueTimeIterator(
|
||||
enqueue_times_.insert(packet.EnqueueTime()));
|
||||
} else {
|
||||
// In order to figure out how much time a packet has spent in the queue
|
||||
// while not in a paused state, we subtract the total amount of time the
|
||||
// queue has been paused so far, and when the packet is popped we subtract
|
||||
// the total amount of time the queue has been paused at that moment. This
|
||||
// way we subtract the total amount of time the packet has spent in the
|
||||
// queue while in a paused state.
|
||||
UpdateQueueTime(packet.EnqueueTime());
|
||||
packet.SubtractPauseTime(pause_time_sum_);
|
||||
|
||||
size_packets_ += 1;
|
||||
size_ += DataSize::Bytes(packet.RtpPacket()->payload_size() +
|
||||
packet.RtpPacket()->padding_size());
|
||||
if (include_overhead_) {
|
||||
size_ += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
|
||||
transport_overhead_per_packet_;
|
||||
size_packets_ += 1;
|
||||
size_ += PacketSize(packet);
|
||||
}
|
||||
|
||||
stream->packet_queue.push(packet);
|
||||
}
|
||||
|
||||
DataSize RoundRobinPacketQueue::PacketSize(const QueuedPacket& packet) const {
|
||||
DataSize packet_size = DataSize::Bytes(packet.RtpPacket()->payload_size() +
|
||||
packet.RtpPacket()->padding_size());
|
||||
if (include_overhead_) {
|
||||
packet_size += DataSize::Bytes(packet.RtpPacket()->headers_size()) +
|
||||
transport_overhead_per_packet_;
|
||||
}
|
||||
return packet_size;
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::MaybePromoteSinglePacketToNormalQueue() {
|
||||
if (single_packet_queue_.has_value()) {
|
||||
Push(*single_packet_queue_);
|
||||
single_packet_queue_.reset();
|
||||
}
|
||||
}
|
||||
|
||||
RoundRobinPacketQueue::Stream*
|
||||
RoundRobinPacketQueue::GetHighestPriorityStream() {
|
||||
RTC_CHECK(!stream_priorities_.empty());
|
||||
|
@ -77,6 +77,7 @@ class RoundRobinPacketQueue {
|
||||
RtpPacketToSend* RtpPacket() const;
|
||||
|
||||
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const;
|
||||
void UpdateEnqueueTimeIterator(std::multiset<Timestamp>::iterator it);
|
||||
void SubtractPauseTime(TimeDelta pause_time_sum);
|
||||
|
||||
private:
|
||||
@ -132,6 +133,9 @@ class RoundRobinPacketQueue {
|
||||
|
||||
void Push(QueuedPacket packet);
|
||||
|
||||
DataSize PacketSize(const QueuedPacket& packet) const;
|
||||
void MaybePromoteSinglePacketToNormalQueue();
|
||||
|
||||
Stream* GetHighestPriorityStream();
|
||||
|
||||
// Just used to verify correctness.
|
||||
@ -161,6 +165,8 @@ class RoundRobinPacketQueue {
|
||||
// the age of the oldest packet in the queue.
|
||||
std::multiset<Timestamp> enqueue_times_;
|
||||
|
||||
absl::optional<QueuedPacket> single_packet_queue_;
|
||||
|
||||
bool include_overhead_;
|
||||
};
|
||||
} // namespace webrtc
|
||||
|
Reference in New Issue
Block a user