Simplify pacer queue
This CL simplifies the pacer queue by removing the now unnecessary beginpop/cancelpop/finalizepop methods. Instead there's a const top() and a pop() much like an stl queue. Old methods using the deprecated pacing code path are cleaned away. Bug: webrtc:10633 Change-Id: Ib6da4d46a571bf56415172b790cc9e3f63206a38 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/150522 Commit-Queue: Erik Språng <sprang@webrtc.org> Reviewed-by: Philip Eliasson <philipel@webrtc.org> Cr-Commit-Position: refs/heads/master@{#28997}
This commit is contained in:
@ -88,6 +88,8 @@ PacingController::PacingController(Clock* clock,
|
||||
send_padding_if_silent_(
|
||||
IsEnabled(*field_trials_, "WebRTC-Pacer-PadInSilence")),
|
||||
pace_audio_(!IsDisabled(*field_trials_, "WebRTC-Pacer-BlockAudio")),
|
||||
send_side_bwe_with_overhead_(
|
||||
IsEnabled(*field_trials_, "WebRTC-SendSideBwe-WithOverhead")),
|
||||
min_packet_limit_(kDefaultMinPacketLimit),
|
||||
last_timestamp_(clock_->CurrentTime()),
|
||||
paused_(false),
|
||||
@ -155,6 +157,33 @@ bool PacingController::Congested() const {
|
||||
return false;
|
||||
}
|
||||
|
||||
DataSize PacingController::PacketSize(const RtpPacketToSend& packet) const {
|
||||
return DataSize::bytes(send_side_bwe_with_overhead_
|
||||
? packet.size()
|
||||
: packet.payload_size() + packet.padding_size());
|
||||
}
|
||||
|
||||
bool PacingController::ShouldSendPacket(const RtpPacketToSend& packet,
|
||||
PacedPacketInfo pacing_info) const {
|
||||
if (!pace_audio_ && packet.packet_type() == RtpPacketToSend::Type::kAudio) {
|
||||
// If audio, and we don't pace audio, pop packet regardless.
|
||||
return true;
|
||||
}
|
||||
// Pacing applies, check if we can.
|
||||
if (Congested()) {
|
||||
// Don't try to send more packets while we are congested.
|
||||
return false;
|
||||
} else if (media_budget_.bytes_remaining() == 0 &&
|
||||
pacing_info.probe_cluster_id == PacedPacketInfo::kNotAProbe) {
|
||||
// No budget left, and not a probe (which can override budget levels),
|
||||
// don't pop this packet.
|
||||
return false;
|
||||
}
|
||||
|
||||
// No blocks for sending packets found!
|
||||
return true;
|
||||
}
|
||||
|
||||
Timestamp PacingController::CurrentTime() const {
|
||||
Timestamp time = clock_->CurrentTime();
|
||||
if (time < last_timestamp_) {
|
||||
@ -197,7 +226,8 @@ void PacingController::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
|
||||
|
||||
RTC_CHECK(packet->packet_type());
|
||||
int priority = GetPriorityForType(*packet->packet_type());
|
||||
packet_queue_.Push(priority, now, packet_counter_++, std::move(packet));
|
||||
DataSize size = PacketSize(*packet);
|
||||
packet_queue_.Push(priority, now, packet_counter_++, size, std::move(packet));
|
||||
}
|
||||
|
||||
void PacingController::SetAccountForAudioPackets(bool account_for_audio) {
|
||||
@ -304,7 +334,7 @@ void PacingController::ProcessPackets() {
|
||||
// Assuming equal size packets and input/output rate, the average packet
|
||||
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
|
||||
// time constraint shall be met. Determine bitrate needed for that.
|
||||
packet_queue_.UpdateQueueTime(CurrentTime());
|
||||
packet_queue_.UpdateQueueTime(now);
|
||||
if (drain_large_queues_) {
|
||||
TimeDelta avg_time_left =
|
||||
std::max(TimeDelta::ms(1),
|
||||
@ -334,8 +364,15 @@ void PacingController::ProcessPackets() {
|
||||
// The paused state is checked in the loop since it leaves the critical
|
||||
// section allowing the paused state to be changed from other code.
|
||||
while (!paused_) {
|
||||
auto* packet = GetPendingPacket(pacing_info);
|
||||
if (packet == nullptr) {
|
||||
std::unique_ptr<RtpPacketToSend> rtp_packet;
|
||||
if (!packet_queue_.Empty()) {
|
||||
const RtpPacketToSend& stored_packet = packet_queue_.Top();
|
||||
if (ShouldSendPacket(stored_packet, pacing_info)) {
|
||||
rtp_packet = packet_queue_.Pop();
|
||||
}
|
||||
}
|
||||
|
||||
if (rtp_packet == nullptr) {
|
||||
// No packet available to send, check if we should send padding.
|
||||
DataSize padding_to_add = PaddingToAdd(recommended_probe_size, data_sent);
|
||||
if (padding_to_add > DataSize::Zero()) {
|
||||
@ -356,13 +393,25 @@ void PacingController::ProcessPackets() {
|
||||
break;
|
||||
}
|
||||
|
||||
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
|
||||
RTC_DCHECK(rtp_packet);
|
||||
const DataSize packet_size = PacketSize(*rtp_packet);
|
||||
const bool audio_packet =
|
||||
rtp_packet->packet_type() == RtpPacketToSend::Type::kAudio;
|
||||
packet_sender_->SendRtpPacket(std::move(rtp_packet), pacing_info);
|
||||
data_sent += packet_size;
|
||||
|
||||
if (!first_sent_packet_time_) {
|
||||
first_sent_packet_time_ = now;
|
||||
}
|
||||
|
||||
if (!audio_packet || account_for_audio_) {
|
||||
// Update media bytes sent.
|
||||
UpdateBudgetWithSentData(packet_size);
|
||||
last_send_time_ = now;
|
||||
}
|
||||
|
||||
padding_failure_state_ = false;
|
||||
|
||||
data_sent += packet->size();
|
||||
// Send succeeded, remove it from the queue.
|
||||
OnPacketSent(packet);
|
||||
if (recommended_probe_size && data_sent > *recommended_probe_size)
|
||||
break;
|
||||
}
|
||||
@ -404,44 +453,6 @@ DataSize PacingController::PaddingToAdd(
|
||||
return DataSize::bytes(padding_budget_.bytes_remaining());
|
||||
}
|
||||
|
||||
RoundRobinPacketQueue::QueuedPacket* PacingController::GetPendingPacket(
|
||||
const PacedPacketInfo& pacing_info) {
|
||||
if (packet_queue_.Empty()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Since we need to release the lock in order to send, we first pop the
|
||||
// element from the priority queue but keep it in storage, so that we can
|
||||
// reinsert it if send fails.
|
||||
RoundRobinPacketQueue::QueuedPacket* packet = packet_queue_.BeginPop();
|
||||
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
|
||||
bool apply_pacing = !audio_packet || pace_audio_;
|
||||
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
|
||||
pacing_info.probe_cluster_id ==
|
||||
PacedPacketInfo::kNotAProbe))) {
|
||||
packet_queue_.CancelPop();
|
||||
return nullptr;
|
||||
}
|
||||
return packet;
|
||||
}
|
||||
|
||||
void PacingController::OnPacketSent(
|
||||
RoundRobinPacketQueue::QueuedPacket* packet) {
|
||||
Timestamp now = CurrentTime();
|
||||
if (!first_sent_packet_time_) {
|
||||
first_sent_packet_time_ = now;
|
||||
}
|
||||
bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
|
||||
if (!audio_packet || account_for_audio_) {
|
||||
// Update media bytes sent.
|
||||
UpdateBudgetWithSentData(packet->size());
|
||||
last_send_time_ = now;
|
||||
}
|
||||
// Send succeeded, remove it from the queue.
|
||||
packet_queue_.FinalizePop();
|
||||
padding_failure_state_ = false;
|
||||
}
|
||||
|
||||
void PacingController::OnPaddingSent(DataSize data_sent) {
|
||||
if (data_sent > DataSize::Zero()) {
|
||||
UpdateBudgetWithSentData(data_sent);
|
||||
|
@ -145,10 +145,10 @@ class PacingController {
|
||||
DataSize PaddingToAdd(absl::optional<DataSize> recommended_probe_size,
|
||||
DataSize data_sent);
|
||||
|
||||
RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
|
||||
const PacedPacketInfo& pacing_info);
|
||||
void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet);
|
||||
void OnPaddingSent(DataSize padding_sent);
|
||||
DataSize PacketSize(const RtpPacketToSend& packet) const;
|
||||
bool ShouldSendPacket(const RtpPacketToSend& packet,
|
||||
PacedPacketInfo pacing_info) const;
|
||||
|
||||
Timestamp CurrentTime() const;
|
||||
|
||||
@ -160,6 +160,7 @@ class PacingController {
|
||||
const bool drain_large_queues_;
|
||||
const bool send_padding_if_silent_;
|
||||
const bool pace_audio_;
|
||||
const bool send_side_bwe_with_overhead_;
|
||||
TimeDelta min_packet_limit_;
|
||||
|
||||
// TODO(webrtc:9716): Remove this when we are certain clocks are monotonic.
|
||||
|
@ -27,22 +27,13 @@ RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
|
||||
|
||||
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
|
||||
int priority,
|
||||
RtpPacketToSend::Type type,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
Timestamp enqueue_time,
|
||||
DataSize size,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order,
|
||||
std::multiset<Timestamp>::iterator enqueue_time_it,
|
||||
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
|
||||
packet_it)
|
||||
: type_(type),
|
||||
priority_(priority),
|
||||
ssrc_(ssrc),
|
||||
sequence_number_(seq_number),
|
||||
capture_time_ms_(capture_time_ms),
|
||||
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it)
|
||||
: priority_(priority),
|
||||
enqueue_time_(enqueue_time),
|
||||
size_(size),
|
||||
retransmission_(retransmission),
|
||||
@ -52,7 +43,10 @@ RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
|
||||
|
||||
std::unique_ptr<RtpPacketToSend>
|
||||
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
|
||||
return packet_it_ ? std::move(**packet_it_) : nullptr;
|
||||
if (packet_it_->get() != nullptr) {
|
||||
return std::move(*packet_it_);
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTime(
|
||||
@ -74,13 +68,6 @@ RoundRobinPacketQueue::Stream::Stream() : size(DataSize::Zero()), ssrc(0) {}
|
||||
RoundRobinPacketQueue::Stream::Stream(const Stream& stream) = default;
|
||||
RoundRobinPacketQueue::Stream::~Stream() {}
|
||||
|
||||
bool IsEnabled(const WebRtcKeyValueConfig* field_trials, const char* name) {
|
||||
if (!field_trials) {
|
||||
return false;
|
||||
}
|
||||
return field_trials->Lookup(name).find("Enabled") == 0;
|
||||
}
|
||||
|
||||
RoundRobinPacketQueue::RoundRobinPacketQueue(
|
||||
Timestamp start_time,
|
||||
const WebRtcKeyValueConfig* field_trials)
|
||||
@ -90,116 +77,82 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(
|
||||
size_(DataSize::Zero()),
|
||||
max_size_(kMaxLeadingSize),
|
||||
queue_time_sum_(TimeDelta::Zero()),
|
||||
pause_time_sum_(TimeDelta::Zero()),
|
||||
send_side_bwe_with_overhead_(
|
||||
IsEnabled(field_trials, "WebRTC-SendSideBwe-WithOverhead")) {}
|
||||
pause_time_sum_(TimeDelta::Zero()) {}
|
||||
|
||||
RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
|
||||
|
||||
void RoundRobinPacketQueue::Push(int priority,
|
||||
RtpPacketToSend::Type type,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
Timestamp enqueue_time,
|
||||
DataSize size,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order) {
|
||||
Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
|
||||
enqueue_time, size, retransmission, enqueue_order,
|
||||
enqueue_times_.insert(enqueue_time), absl::nullopt));
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::Push(int priority,
|
||||
Timestamp enqueue_time,
|
||||
uint64_t enqueue_order,
|
||||
DataSize size,
|
||||
std::unique_ptr<RtpPacketToSend> packet) {
|
||||
uint32_t ssrc = packet->Ssrc();
|
||||
uint16_t sequence_number = packet->SequenceNumber();
|
||||
int64_t capture_time_ms = packet->capture_time_ms();
|
||||
DataSize size =
|
||||
DataSize::bytes(send_side_bwe_with_overhead_
|
||||
? packet->size()
|
||||
: packet->payload_size() + packet->padding_size());
|
||||
auto type = packet->packet_type();
|
||||
RTC_DCHECK(type.has_value());
|
||||
|
||||
const uint32_t ssrc = packet->Ssrc();
|
||||
const bool retransmission =
|
||||
packet->packet_type() == RtpPacketToSend::Type::kRetransmission;
|
||||
rtp_packets_.push_front(std::move(packet));
|
||||
Push(QueuedPacket(
|
||||
priority, *type, ssrc, sequence_number, capture_time_ms, enqueue_time,
|
||||
size, *type == RtpPacketToSend::Type::kRetransmission, enqueue_order,
|
||||
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()));
|
||||
Push(QueuedPacket(priority, enqueue_time, size, retransmission, enqueue_order,
|
||||
enqueue_times_.insert(enqueue_time), rtp_packets_.begin()),
|
||||
ssrc);
|
||||
}
|
||||
|
||||
RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
|
||||
RTC_CHECK(!pop_packet_ && !pop_stream_);
|
||||
const RtpPacketToSend& RoundRobinPacketQueue::Top() const {
|
||||
RTC_DCHECK(!Empty());
|
||||
|
||||
const Stream& stream = GetHighestPriorityStream();
|
||||
auto packet = stream.packet_queue.top();
|
||||
return packet.get_packet();
|
||||
}
|
||||
|
||||
std::unique_ptr<RtpPacketToSend> RoundRobinPacketQueue::Pop() {
|
||||
RTC_DCHECK(!Empty());
|
||||
|
||||
Stream* stream = GetHighestPriorityStream();
|
||||
pop_stream_.emplace(stream);
|
||||
pop_packet_.emplace(stream->packet_queue.top());
|
||||
auto packet = stream->packet_queue.top();
|
||||
stream->packet_queue.pop();
|
||||
|
||||
return &pop_packet_.value();
|
||||
}
|
||||
std::unique_ptr<RtpPacketToSend> rtp_packet = packet.ReleasePacket();
|
||||
|
||||
void RoundRobinPacketQueue::CancelPop() {
|
||||
RTC_CHECK(pop_packet_ && pop_stream_);
|
||||
(*pop_stream_)->packet_queue.push(*pop_packet_);
|
||||
pop_packet_.reset();
|
||||
pop_stream_.reset();
|
||||
}
|
||||
stream_priorities_.erase(stream->priority_it);
|
||||
|
||||
void RoundRobinPacketQueue::FinalizePop() {
|
||||
if (!Empty()) {
|
||||
RTC_CHECK(pop_packet_ && pop_stream_);
|
||||
Stream* stream = *pop_stream_;
|
||||
stream_priorities_.erase(stream->priority_it);
|
||||
const QueuedPacket& packet = *pop_packet_;
|
||||
// Calculate the total amount of time spent by this packet in the queue
|
||||
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
|
||||
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
|
||||
// by subtracting it now we effectively remove the time spent in in the
|
||||
// queue while in a paused state.
|
||||
TimeDelta time_in_non_paused_state =
|
||||
time_last_updated_ - packet.enqueue_time() - pause_time_sum_;
|
||||
queue_time_sum_ -= time_in_non_paused_state;
|
||||
|
||||
// Calculate the total amount of time spent by this packet in the queue
|
||||
// while in a non-paused state. Note that the |pause_time_sum_ms_| was
|
||||
// subtracted from |packet.enqueue_time_ms| when the packet was pushed, and
|
||||
// by subtracting it now we effectively remove the time spent in in the
|
||||
// queue while in a paused state.
|
||||
TimeDelta time_in_non_paused_state =
|
||||
time_last_updated_ - packet.enqueue_time() - pause_time_sum_;
|
||||
queue_time_sum_ -= time_in_non_paused_state;
|
||||
RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
|
||||
enqueue_times_.erase(packet.EnqueueTimeIterator());
|
||||
|
||||
RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
|
||||
enqueue_times_.erase(packet.EnqueueTimeIterator());
|
||||
rtp_packets_.erase(packet.PacketIterator());
|
||||
|
||||
auto packet_it = packet.PacketIterator();
|
||||
if (packet_it) {
|
||||
rtp_packets_.erase(*packet_it);
|
||||
}
|
||||
// Update |bytes| of this stream. The general idea is that the stream that
|
||||
// has sent the least amount of bytes should have the highest priority.
|
||||
// The problem with that is if streams send with different rates, in which
|
||||
// case a "budget" will be built up for the stream sending at the lower
|
||||
// rate. To avoid building a too large budget we limit |bytes| to be within
|
||||
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
|
||||
stream->size =
|
||||
std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize);
|
||||
max_size_ = std::max(max_size_, stream->size);
|
||||
|
||||
// Update |bytes| of this stream. The general idea is that the stream that
|
||||
// has sent the least amount of bytes should have the highest priority.
|
||||
// The problem with that is if streams send with different rates, in which
|
||||
// case a "budget" will be built up for the stream sending at the lower
|
||||
// rate. To avoid building a too large budget we limit |bytes| to be within
|
||||
// kMaxLeading bytes of the stream that has sent the most amount of bytes.
|
||||
stream->size =
|
||||
std::max(stream->size + packet.size(), max_size_ - kMaxLeadingSize);
|
||||
max_size_ = std::max(max_size_, stream->size);
|
||||
size_ -= packet.size();
|
||||
size_packets_ -= 1;
|
||||
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
|
||||
|
||||
size_ -= packet.size();
|
||||
size_packets_ -= 1;
|
||||
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ == TimeDelta::Zero());
|
||||
|
||||
// If there are packets left to be sent, schedule the stream again.
|
||||
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
|
||||
if (stream->packet_queue.empty()) {
|
||||
stream->priority_it = stream_priorities_.end();
|
||||
} else {
|
||||
int priority = stream->packet_queue.top().priority();
|
||||
stream->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(priority, stream->size), stream->ssrc);
|
||||
}
|
||||
|
||||
pop_packet_.reset();
|
||||
pop_stream_.reset();
|
||||
// If there are packets left to be sent, schedule the stream again.
|
||||
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
|
||||
if (stream->packet_queue.empty()) {
|
||||
stream->priority_it = stream_priorities_.end();
|
||||
} else {
|
||||
int priority = stream->packet_queue.top().priority();
|
||||
stream->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(priority, stream->size), stream->ssrc);
|
||||
}
|
||||
|
||||
return rtp_packet;
|
||||
}
|
||||
|
||||
bool RoundRobinPacketQueue::Empty() const {
|
||||
@ -252,12 +205,12 @@ TimeDelta RoundRobinPacketQueue::AverageQueueTime() const {
|
||||
return queue_time_sum_ / size_packets_;
|
||||
}
|
||||
|
||||
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
|
||||
auto stream_info_it = streams_.find(packet.ssrc());
|
||||
void RoundRobinPacketQueue::Push(QueuedPacket packet, uint32_t ssrc) {
|
||||
auto stream_info_it = streams_.find(ssrc);
|
||||
if (stream_info_it == streams_.end()) {
|
||||
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
|
||||
stream_info_it = streams_.emplace(ssrc, Stream()).first;
|
||||
stream_info_it->second.priority_it = stream_priorities_.end();
|
||||
stream_info_it->second.ssrc = packet.ssrc();
|
||||
stream_info_it->second.ssrc = ssrc;
|
||||
}
|
||||
|
||||
Stream* stream = &stream_info_it->second;
|
||||
@ -266,14 +219,14 @@ void RoundRobinPacketQueue::Push(QueuedPacket packet) {
|
||||
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
|
||||
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
|
||||
stream->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
|
||||
StreamPrioKey(packet.priority(), stream->size), ssrc);
|
||||
} else if (packet.priority() < stream->priority_it->first.priority) {
|
||||
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
|
||||
// and insert a new one with the new priority. Note that |priority_| uses
|
||||
// lower ordinal for higher priority.
|
||||
stream_priorities_.erase(stream->priority_it);
|
||||
stream->priority_it = stream_priorities_.emplace(
|
||||
StreamPrioKey(packet.priority(), stream->size), packet.ssrc());
|
||||
StreamPrioKey(packet.priority(), stream->size), ssrc);
|
||||
}
|
||||
RTC_CHECK(stream->priority_it != stream_priorities_.end());
|
||||
|
||||
@ -304,6 +257,18 @@ RoundRobinPacketQueue::GetHighestPriorityStream() {
|
||||
return &stream_info_it->second;
|
||||
}
|
||||
|
||||
const RoundRobinPacketQueue::Stream&
|
||||
RoundRobinPacketQueue::GetHighestPriorityStream() const {
|
||||
RTC_CHECK(!stream_priorities_.empty());
|
||||
uint32_t ssrc = stream_priorities_.begin()->second;
|
||||
|
||||
auto stream_info_it = streams_.find(ssrc);
|
||||
RTC_CHECK(stream_info_it != streams_.end());
|
||||
RTC_CHECK(stream_info_it->second.priority_it == stream_priorities_.begin());
|
||||
RTC_CHECK(!stream_info_it->second.packet_queue.empty());
|
||||
return stream_info_it->second;
|
||||
}
|
||||
|
||||
bool RoundRobinPacketQueue::IsSsrcScheduled(uint32_t ssrc) const {
|
||||
for (const auto& scheduled_stream : stream_priorities_) {
|
||||
if (scheduled_stream.second == ssrc)
|
||||
|
@ -37,80 +37,18 @@ class RoundRobinPacketQueue {
|
||||
const WebRtcKeyValueConfig* field_trials);
|
||||
~RoundRobinPacketQueue();
|
||||
|
||||
struct QueuedPacket {
|
||||
public:
|
||||
QueuedPacket(
|
||||
int priority,
|
||||
RtpPacketToSend::Type type,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
Timestamp enqueue_time,
|
||||
DataSize size,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order,
|
||||
std::multiset<Timestamp>::iterator enqueue_time_it,
|
||||
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
|
||||
packet_it);
|
||||
QueuedPacket(const QueuedPacket& rhs);
|
||||
~QueuedPacket();
|
||||
|
||||
bool operator<(const QueuedPacket& other) const;
|
||||
|
||||
int priority() const { return priority_; }
|
||||
RtpPacketToSend::Type type() const { return type_; }
|
||||
uint32_t ssrc() const { return ssrc_; }
|
||||
uint16_t sequence_number() const { return sequence_number_; }
|
||||
int64_t capture_time_ms() const { return capture_time_ms_; }
|
||||
Timestamp enqueue_time() const { return enqueue_time_; }
|
||||
DataSize size() const { return size_; }
|
||||
bool is_retransmission() const { return retransmission_; }
|
||||
uint64_t enqueue_order() const { return enqueue_order_; }
|
||||
std::unique_ptr<RtpPacketToSend> ReleasePacket();
|
||||
|
||||
// For internal use.
|
||||
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
|
||||
PacketIterator() const {
|
||||
return packet_it_;
|
||||
}
|
||||
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
|
||||
return enqueue_time_it_;
|
||||
}
|
||||
void SubtractPauseTime(TimeDelta pause_time_sum);
|
||||
|
||||
private:
|
||||
RtpPacketToSend::Type type_;
|
||||
int priority_;
|
||||
uint32_t ssrc_;
|
||||
uint16_t sequence_number_;
|
||||
int64_t capture_time_ms_; // Absolute time of frame capture.
|
||||
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
|
||||
DataSize size_;
|
||||
bool retransmission_;
|
||||
uint64_t enqueue_order_;
|
||||
std::multiset<Timestamp>::iterator enqueue_time_it_;
|
||||
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
|
||||
// if applicable.
|
||||
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
|
||||
packet_it_;
|
||||
};
|
||||
|
||||
void Push(int priority,
|
||||
RtpPacketToSend::Type type,
|
||||
uint32_t ssrc,
|
||||
uint16_t seq_number,
|
||||
int64_t capture_time_ms,
|
||||
Timestamp enqueue_time,
|
||||
DataSize size,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order);
|
||||
void Push(int priority,
|
||||
Timestamp enqueue_time,
|
||||
uint64_t enqueue_order,
|
||||
DataSize size,
|
||||
std::unique_ptr<RtpPacketToSend> packet);
|
||||
QueuedPacket* BeginPop();
|
||||
void CancelPop();
|
||||
void FinalizePop();
|
||||
|
||||
// Peek at the next packet in line to be sent. Note that this method must not
|
||||
// be called unless Empty() returns false.
|
||||
const RtpPacketToSend& Top() const;
|
||||
|
||||
// Remove the highest prio element from the queue and return it.
|
||||
std::unique_ptr<RtpPacketToSend> Pop();
|
||||
|
||||
bool Empty() const;
|
||||
size_t SizeInPackets() const;
|
||||
@ -122,6 +60,53 @@ class RoundRobinPacketQueue {
|
||||
void SetPauseState(bool paused, Timestamp now);
|
||||
|
||||
private:
|
||||
struct QueuedPacket {
|
||||
public:
|
||||
QueuedPacket(
|
||||
int priority,
|
||||
Timestamp enqueue_time,
|
||||
DataSize size,
|
||||
bool retransmission,
|
||||
uint64_t enqueue_order,
|
||||
std::multiset<Timestamp>::iterator enqueue_time_it,
|
||||
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it);
|
||||
QueuedPacket(const QueuedPacket& rhs);
|
||||
~QueuedPacket();
|
||||
|
||||
bool operator<(const QueuedPacket& other) const;
|
||||
|
||||
int priority() const { return priority_; }
|
||||
Timestamp enqueue_time() const { return enqueue_time_; }
|
||||
DataSize size() const { return size_; }
|
||||
bool is_retransmission() const { return retransmission_; }
|
||||
uint64_t enqueue_order() const { return enqueue_order_; }
|
||||
std::unique_ptr<RtpPacketToSend> ReleasePacket();
|
||||
|
||||
// For internal use.
|
||||
std::list<std::unique_ptr<RtpPacketToSend>>::iterator PacketIterator()
|
||||
const {
|
||||
return packet_it_;
|
||||
}
|
||||
std::multiset<Timestamp>::iterator EnqueueTimeIterator() const {
|
||||
return enqueue_time_it_;
|
||||
}
|
||||
void SubtractPauseTime(TimeDelta pause_time_sum);
|
||||
|
||||
const RtpPacketToSend& get_packet() const {
|
||||
RTC_DCHECK(packet_it_->get() != nullptr);
|
||||
return **packet_it_;
|
||||
}
|
||||
|
||||
int priority_;
|
||||
Timestamp enqueue_time_; // Absolute time of pacer queue entry.
|
||||
DataSize size_;
|
||||
bool retransmission_;
|
||||
uint64_t enqueue_order_;
|
||||
std::multiset<Timestamp>::iterator enqueue_time_it_;
|
||||
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned.
|
||||
std::list<std::unique_ptr<RtpPacketToSend>>::iterator packet_it_;
|
||||
};
|
||||
|
||||
struct StreamPrioKey {
|
||||
StreamPrioKey(int priority, DataSize size)
|
||||
: priority(priority), size(size) {}
|
||||
@ -154,16 +139,15 @@ class RoundRobinPacketQueue {
|
||||
std::multimap<StreamPrioKey, uint32_t>::iterator priority_it;
|
||||
};
|
||||
|
||||
void Push(QueuedPacket packet);
|
||||
void Push(QueuedPacket packet, uint32_t ssrc);
|
||||
|
||||
Stream* GetHighestPriorityStream();
|
||||
const Stream& GetHighestPriorityStream() const;
|
||||
|
||||
// Just used to verify correctness.
|
||||
bool IsSsrcScheduled(uint32_t ssrc) const;
|
||||
|
||||
Timestamp time_last_updated_;
|
||||
absl::optional<QueuedPacket> pop_packet_;
|
||||
absl::optional<Stream*> pop_stream_;
|
||||
|
||||
bool paused_;
|
||||
size_t size_packets_;
|
||||
@ -190,8 +174,6 @@ class RoundRobinPacketQueue {
|
||||
// end iterator of this list if queue does not have direct ownership of the
|
||||
// packet.
|
||||
std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
|
||||
|
||||
const bool send_side_bwe_with_overhead_;
|
||||
};
|
||||
} // namespace webrtc
|
||||
|
||||
|
@ -57,14 +57,14 @@ class RtpPacketToSend : public RtpPacket {
|
||||
void set_retransmitted_sequence_number(uint16_t sequence_number) {
|
||||
retransmitted_sequence_number_ = sequence_number;
|
||||
}
|
||||
absl::optional<uint16_t> retransmitted_sequence_number() {
|
||||
absl::optional<uint16_t> retransmitted_sequence_number() const {
|
||||
return retransmitted_sequence_number_;
|
||||
}
|
||||
|
||||
void set_allow_retransmission(bool allow_retransmission) {
|
||||
allow_retransmission_ = allow_retransmission;
|
||||
}
|
||||
bool allow_retransmission() { return allow_retransmission_; }
|
||||
bool allow_retransmission() const { return allow_retransmission_; }
|
||||
|
||||
// Additional data bound to the RTP packet for use in application code,
|
||||
// outside of WebRTC.
|
||||
|
Reference in New Issue
Block a user