Revert "Simplify pacer queue"

This reverts commit 7db900e2e78d1644a173a0bc505ad52c61c43f9b.

Reason for revert: Speculative revert

Original change's description:
> Simplify pacer queue
> 
> This CL simplifies the pacer queue by removing the now unnecessary
> beginpop/cancelpop/finalizepop methods. Instead there's a const top()
> and a pop() much like an stl queue.
> Old methods using the deprecated pacing code path are cleaned away.
> 
> Bug: webrtc:10633
> Change-Id: Ib6da4d46a571bf56415172b790cc9e3f63206a38
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150522
> Commit-Queue: Erik Språng <sprang@webrtc.org>
> Reviewed-by: Philip Eliasson <philipel@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#28997}

TBR=sprang@webrtc.org,philipel@webrtc.org

# Not skipping CQ checks because original CL landed > 1 day ago.

Bug: webrtc:10633
Change-Id: I38f61afed4f4d542e236bcce3152a3aab52c6e6b
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/151120
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Erik Språng <sprang@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#29030}
This commit is contained in:
Erik Språng
2019-09-01 12:26:44 +00:00
committed by Commit Bot
parent ce6a0c8fb3
commit f660e81a56
5 changed files with 239 additions and 198 deletions

View File

@ -88,8 +88,6 @@ PacingController::PacingController(Clock* clock,
send_padding_if_silent_(
IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")),
pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")),
send_side_bwe_with_overhead_(
IsEnabled(*field_trials_, "WebRTC-SendSideBwe-WithOverhead")),
min_packet_limit_(kDefaultMinPacketLimit),
last_timestamp_(clock_->CurrentTime()),
paused_(false),
@ -157,33 +155,6 @@ bool PacingController::Congested() const {
return false;
}
DataSize PacingController::PacketSize(const RtpPacketToSend& packet) const {
return DataSize::bytes(send_side_bwe_with_overhead_
? packet.size()
: packet.payload_size() + packet.padding_size());
}
bool PacingController::ShouldSendPacket(const RtpPacketToSend& packet,
PacedPacketInfo pacing_info) const {
if (!pace_audio_ && packet.packet_type() == RtpPacketToSend::Type::kAudio) {
// If audio, and we don't pace audio, pop packet regardless.
return true;
}
// Pacing applies, check if we can.
if (Congested()) {
// Don't try to send more packets while we are congested.
return false;
} else if (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
// No budget left, and not a probe (which can override budget levels),
// don't pop this packet.
return false;
}
// No blocks for sending packets found!
return true;
}
Timestamp PacingController::CurrentTime() const {
Timestamp time = clock_->CurrentTime();
if (time < last_timestamp_) {
@ -226,8 +197,7 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
RTC_CHECK(packet->packet_type());
int priority = GetPriorityForType(*packet->packet_type());
DataSize size = PacketSize(*packet);
packet_queue_.Push(priority, now, packet_counter_++, size, std::move(packet));
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
}
void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
@ -334,7 +304,7 @@ void PacingController::ProcessPackets() {
// Assuming equal size packets and input/output rate, the average packet
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
// time constraint shall be met. Determine bitrate needed for that.
packet_queue_.UpdateQueueTime(now);
packet_queue_.UpdateQueueTime(CurrentTime());
if (drain_large_queues_) {
TimeDelta avg_time_left =
std::max(TimeDelta::ms(1),
@ -364,15 +334,8 @@ void PacingController::ProcessPackets() {
// The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code.
while (!paused_) {
std::unique_ptr<RtpPacketToSend> rtp_packet;
if (!packet_queue_.Empty()) {
const RtpPacketToSend& stored_packet = packet_queue_.Top();
if (ShouldSendPacket(stored_packet, pacing_info)) {
rtp_packet = packet_queue_.Pop();
}
}
if (rtp_packet == nullptr) {
auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr) {
// No packet available to send, check if we should send padding.
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
if (padding_to_add > DataSize::Zero()) {
@ -393,25 +356,13 @@ void PacingController::ProcessPackets() {
break;
}
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
RTC_DCHECK(rtp_packet);
const DataSize packet_size = PacketSize(*rtp_packet);
const bool audio_packet =
rtp_packet->packet_type() == RtpPacketToSend::Type::kAudio;
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
data_sent += packet_size;
if (!first_sent_packet_time_) {
first_sent_packet_time_ = now;
}
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
UpdateBudgetWithSentData(packet_size);
last_send_time_ = now;
}
padding_failure_state_ = false;
data_sent += packet->size();
// Send succeeded, remove it from the queue.
OnPacketSent(packet);
if (recommended_probe_size && data_sent > *recommended_probe_size)
break;
}
@ -453,6 +404,44 @@ DataSize PacingController::PaddingToAdd(
return DataSize::bytes(padding_budget_.bytes_remaining());
}
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
const PacedPacketInfo& pacing_info) {
if (packet_queue_.Empty()) {
return nullptr;
}
// Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails.
RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
bool apply_pacing = !audio_packet || pace_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) {
packet_queue_.CancelPop();
return nullptr;
}
return packet;
}
void PacingController::OnPacketSent(
RoundRobinPacketQueue::QueuedPacket* packet) {
Timestamp now = CurrentTime();
if (!first_sent_packet_time_) {
first_sent_packet_time_ = now;
}
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
if (!audio_packet || account_for_audio_) {
// Update media bytes sent.
UpdateBudgetWithSentData(packet->size());
last_send_time_ = now;
}
// Send succeeded, remove it from the queue.
packet_queue_.FinalizePop();
padding_failure_state_ = false;
}
void PacingController::OnPaddingSent(DataSize data_sent) {
if (data_sent > DataSize::Zero()) {
UpdateBudgetWithSentData(data_sent);

View File

@ -145,10 +145,10 @@ class PacingController {
DataSize PaddingToAdd(absl::optional<DataSize> recommended_probe_size,
DataSize data_sent);
RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
const PacedPacketInfo& pacing_info);
void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet);
void OnPaddingSent(DataSize padding_sent);
DataSize PacketSize(const RtpPacketToSend& packet) const;
bool ShouldSendPacket(const RtpPacketToSend& packet,
PacedPacketInfo pacing_info) const;
Timestamp CurrentTime() const;
@ -160,7 +160,6 @@ class PacingController {
const bool drain_large_queues_;
const bool send_padding_if_silent_;
const bool pace_audio_;
const bool send_side_bwe_with_overhead_;
TimeDelta min_packet_limit_;
// TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.

View File

@ -27,13 +27,22 @@ RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
int priority,
RtpPacketToSend::Type type,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it)
: priority_(priority),
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it)
: type_(type),
priority_(priority),
ssrc_(ssrc),
sequence_number_(seq_number),
capture_time_ms_(capture_time_ms),
enqueue_time_(enqueue_time),
size_(size),
retransmission_(retransmission),
@ -43,10 +52,7 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
if (packet_it_->get() != nullptr) {
return std::move(*packet_it_);
}
return nullptr;
return packet_it_ ? std::move(**packet_it_) : nullptr;
}
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
@ -68,6 +74,13 @@ RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
RoundRobinPacketQueue::Stream::~Stream() {}
bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
if (!field_trials) {
return false;
}
return field_trials->Lookup(name).find("Enabled") == 0;
}
RoundRobinPacketQueue::RoundRobinPacketQueue(
Timestamp start_time,
const WebRtcKeyValueConfig* field_trials)
@ -77,42 +90,71 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(
size_(DataSize::Zero()),
max_size_(kMaxLeadingSize),
queue_time_sum_(TimeDelta::Zero()),
pause_time_sum_(TimeDelta::Zero()) {}
pause_time_sum_(TimeDelta::Zero()),
send_side_bwe_with_overhead_(
IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
void RoundRobinPacketQueue::Push(int priority,
RtpPacketToSend::Type type,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order) {
Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
enqueue_time, size, retransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), absl::nullopt));
}
void RoundRobinPacketQueue::Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
DataSize size,
std::unique_ptr<RtpPacketToSend> packet) {
const uint32_t ssrc = packet->Ssrc();
const bool retransmission =
packet->packet_type() == RtpPacketToSend::Type::kRetransmission;
uint32_t ssrc = packet->Ssrc();
uint16_t sequence_number = packet->SequenceNumber();
int64_t capture_time_ms = packet->capture_time_ms();
DataSize size =
DataSize::bytes(send_side_bwe_with_overhead_
? packet->size()
: packet->payload_size() + packet->padding_size());
auto type = packet->packet_type();
RTC_DCHECK(type.has_value());
rtp_packets_.push_front(std::move(packet));
Push(QueuedPacket(priority, enqueue_time, size, retransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()),
ssrc);
Push(QueuedPacket(
priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
}
const RtpPacketToSend& RoundRobinPacketQueue::Top() const {
RTC_DCHECK(!Empty());
const Stream& stream = GetHighestPriorityStream();
auto packet = stream.packet_queue.top();
return packet.get_packet();
}
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
RTC_DCHECK(!Empty());
RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
RTC_CHECK(!pop_packet_ && !pop_stream_);
Stream* stream = GetHighestPriorityStream();
auto packet = stream->packet_queue.top();
pop_stream_.emplace(stream);
pop_packet_.emplace(stream->packet_queue.top());
stream->packet_queue.pop();
std::unique_ptr<RtpPacketToSend> rtp_packet = packet.ReleasePacket();
return &pop_packet_.value();
}
void RoundRobinPacketQueue::CancelPop() {
RTC_CHECK(pop_packet_ && pop_stream_);
(*pop_stream_)->packet_queue.push(*pop_packet_);
pop_packet_.reset();
pop_stream_.reset();
}
void RoundRobinPacketQueue::FinalizePop() {
if (!Empty()) {
RTC_CHECK(pop_packet_ && pop_stream_);
Stream* stream = *pop_stream_;
stream_priorities_.erase(stream->priority_it);
const QueuedPacket& packet = *pop_packet_;
// Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
@ -126,7 +168,10 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
enqueue_times_.erase(packet.EnqueueTimeIterator());
rtp_packets_.erase(packet.PacketIterator());
auto packet_it = packet.PacketIterator();
if (packet_it) {
rtp_packets_.erase(*packet_it);
}
// Update |bytes| of this stream. The general idea is that the stream that
// has sent the least amount of bytes should have the highest priority.
@ -152,7 +197,9 @@ std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
StreamPrioKey(priority, stream->size), stream->ssrc);
}
return rtp_packet;
pop_packet_.reset();
pop_stream_.reset();
}
}
bool RoundRobinPacketQueue::Empty() const {
@ -205,12 +252,12 @@ TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
return queue_time_sum_ / size_packets_;
}
void RoundRobinPacketQueue::Push(QueuedPacket packet, uint32_t ssrc) {
auto stream_info_it = streams_.find(ssrc);
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(ssrc, Stream()).first;
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = ssrc;
stream_info_it->second.ssrc = packet.ssrc();
}
Stream* stream = &stream_info_it->second;
@ -219,14 +266,14 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet, uint32_t ssrc) {
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), ssrc);
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
} else if (packet.priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->size), ssrc);
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
@ -257,18 +304,6 @@ RoundRobinPacketQueue::GetHighestPriorityStream() {
return &stream_info_it->second;
}
const RoundRobinPacketQueue::Stream&
RoundRobinPacketQueue::GetHighestPriorityStream() const {
RTC_CHECK(!stream_priorities_.empty());
uint32_t ssrc = stream_priorities_.begin()->second;
auto stream_info_it = streams_.find(ssrc);
RTC_CHECK(stream_info_it != streams_.end());
RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
RTC_CHECK(!stream_info_it->second.packet_queue.empty());
return stream_info_it->second;
}
bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
for (const auto& scheduled_stream : stream_priorities_) {
if (scheduled_stream.second == ssrc)

View File

@ -37,18 +37,80 @@ class RoundRobinPacketQueue {
const WebRtcKeyValueConfig* field_trials);
~RoundRobinPacketQueue();
struct QueuedPacket {
public:
QueuedPacket(
int priority,
RtpPacketToSend::Type type,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it);
QueuedPacket(const QueuedPacket& rhs);
~QueuedPacket();
bool operator<(const QueuedPacket& other) const;
int priority() const { return priority_; }
RtpPacketToSend::Type type() const { return type_; }
uint32_t ssrc() const { return ssrc_; }
uint16_t sequence_number() const { return sequence_number_; }
int64_t capture_time_ms() const { return capture_time_ms_; }
Timestamp enqueue_time() const { return enqueue_time_; }
DataSize size() const { return size_; }
bool is_retransmission() const { return retransmission_; }
uint64_t enqueue_order() const { return enqueue_order_; }
std::unique_ptr<RtpPacketToSend> ReleasePacket();
// For internal use.
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
PacketIterator() const {
return packet_it_;
}
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
return enqueue_time_it_;
}
void SubtractPauseTime(TimeDelta pause_time_sum);
private:
RtpPacketToSend::Type type_;
int priority_;
uint32_t ssrc_;
uint16_t sequence_number_;
int64_t capture_time_ms_; // Absolute time of frame capture.
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
DataSize size_;
bool retransmission_;
uint64_t enqueue_order_;
std::multiset<Timestamp>::iterator enqueue_time_it_;
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
// if applicable.
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it_;
};
void Push(int priority,
RtpPacketToSend::Type type,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order);
void Push(int priority,
Timestamp enqueue_time,
uint64_t enqueue_order,
DataSize size,
std::unique_ptr<RtpPacketToSend> packet);
// Peek at the next packet in line to be sent. Note that this method must not
// be called unless Empty() returns false.
const RtpPacketToSend& Top() const;
// Remove the highest prio element from the queue and return it.
std::unique_ptr<RtpPacketToSend> Pop();
QueuedPacket* BeginPop();
void CancelPop();
void FinalizePop();
bool Empty() const;
size_t SizeInPackets() const;
@ -60,53 +122,6 @@ class RoundRobinPacketQueue {
void SetPauseState(bool paused, Timestamp now);
private:
struct QueuedPacket {
public:
QueuedPacket(
int priority,
Timestamp enqueue_time,
DataSize size,
bool retransmission,
uint64_t enqueue_order,
std::multiset<Timestamp>::iterator enqueue_time_it,
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it);
QueuedPacket(const QueuedPacket& rhs);
~QueuedPacket();
bool operator<(const QueuedPacket& other) const;
int priority() const { return priority_; }
Timestamp enqueue_time() const { return enqueue_time_; }
DataSize size() const { return size_; }
bool is_retransmission() const { return retransmission_; }
uint64_t enqueue_order() const { return enqueue_order_; }
std::unique_ptr<RtpPacketToSend> ReleasePacket();
// For internal use.
std::list<std::unique_ptr<RtpPacketToSend>>::iterator PacketIterator()
const {
return packet_it_;
}
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
return enqueue_time_it_;
}
void SubtractPauseTime(TimeDelta pause_time_sum);
const RtpPacketToSend& get_packet() const {
RTC_DCHECK(packet_it_->get() != nullptr);
return **packet_it_;
}
int priority_;
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
DataSize size_;
bool retransmission_;
uint64_t enqueue_order_;
std::multiset<Timestamp>::iterator enqueue_time_it_;
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned.
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it_;
};
struct StreamPrioKey {
StreamPrioKey(int priority, DataSize size)
: priority(priority), size(size) {}
@ -139,15 +154,16 @@ class RoundRobinPacketQueue {
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
};
void Push(QueuedPacket packet, uint32_t ssrc);
void Push(QueuedPacket packet);
Stream* GetHighestPriorityStream();
const Stream& GetHighestPriorityStream() const;
// Just used to verify correctness.
bool IsSsrcScheduled(uint32_t ssrc) const;
Timestamp time_last_updated_;
absl::optional<QueuedPacket> pop_packet_;
absl::optional<Stream*> pop_stream_;
bool paused_;
size_t size_packets_;
@ -174,6 +190,8 @@ class RoundRobinPacketQueue {
// end iterator of this list if queue does not have direct ownership of the
// packet.
std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
const bool send_side_bwe_with_overhead_;
};
} // namespace webrtc

View File

@ -57,14 +57,14 @@ class RtpPacketToSend : public RtpPacket {
void set_retransmitted_sequence_number(uint16_t sequence_number) {
retransmitted_sequence_number_ = sequence_number;
}
absl::optional<uint16_t> retransmitted_sequence_number() const {
absl::optional<uint16_t> retransmitted_sequence_number() {
return retransmitted_sequence_number_;
}
void set_allow_retransmission(bool allow_retransmission) {
allow_retransmission_ = allow_retransmission;
}
bool allow_retransmission() const { return allow_retransmission_; }
bool allow_retransmission() { return allow_retransmission_; }
// Additional data bound to the RTP packet for use in application code,
// outside of WebRTC.