|
|
|
|
@ -10,12 +10,11 @@
|
|
|
|
|
|
|
|
|
|
#include "webrtc/modules/pacing/paced_sender.h"
|
|
|
|
|
|
|
|
|
|
#include <assert.h>
|
|
|
|
|
|
|
|
|
|
#include <map>
|
|
|
|
|
#include <queue>
|
|
|
|
|
#include <set>
|
|
|
|
|
|
|
|
|
|
#include "webrtc/base/checks.h"
|
|
|
|
|
#include "webrtc/modules/include/module_common_types.h"
|
|
|
|
|
#include "webrtc/modules/pacing/bitrate_prober.h"
|
|
|
|
|
#include "webrtc/system_wrappers/include/clock.h"
|
|
|
|
|
@ -86,7 +85,11 @@ struct Comparator {
|
|
|
|
|
// Class encapsulating a priority queue with some extensions.
|
|
|
|
|
class PacketQueue {
|
|
|
|
|
public:
|
|
|
|
|
PacketQueue() : bytes_(0) {}
|
|
|
|
|
explicit PacketQueue(Clock* clock)
|
|
|
|
|
: bytes_(0),
|
|
|
|
|
clock_(clock),
|
|
|
|
|
queue_time_sum_(0),
|
|
|
|
|
time_last_updated_(clock_->TimeInMilliseconds()) {}
|
|
|
|
|
virtual ~PacketQueue() {}
|
|
|
|
|
|
|
|
|
|
void Push(const Packet& packet) {
|
|
|
|
|
@ -114,6 +117,7 @@ class PacketQueue {
|
|
|
|
|
void FinalizePop(const Packet& packet) {
|
|
|
|
|
RemoveFromDupeSet(packet);
|
|
|
|
|
bytes_ -= packet.bytes;
|
|
|
|
|
queue_time_sum_ -= (time_last_updated_ - packet.enqueue_time_ms);
|
|
|
|
|
packet_list_.erase(packet.this_it);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -123,13 +127,22 @@ class PacketQueue {
|
|
|
|
|
|
|
|
|
|
uint64_t SizeInBytes() const { return bytes_; }
|
|
|
|
|
|
|
|
|
|
int64_t OldestEnqueueTime() const {
|
|
|
|
|
std::list<Packet>::const_reverse_iterator it = packet_list_.rbegin();
|
|
|
|
|
int64_t OldestEnqueueTimeMs() const {
|
|
|
|
|
auto it = packet_list_.rbegin();
|
|
|
|
|
if (it == packet_list_.rend())
|
|
|
|
|
return 0;
|
|
|
|
|
return it->enqueue_time_ms;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t AverageQueueTimeMs() {
|
|
|
|
|
int64_t now = clock_->TimeInMilliseconds();
|
|
|
|
|
RTC_DCHECK_GE(now, time_last_updated_);
|
|
|
|
|
int64_t delta = now - time_last_updated_;
|
|
|
|
|
queue_time_sum_ += delta * prio_queue_.size();
|
|
|
|
|
time_last_updated_ = now;
|
|
|
|
|
return queue_time_sum_ / prio_queue_.size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
// Try to add a packet to the set of ssrc/seqno identifiers currently in the
|
|
|
|
|
// queue. Return true if inserted, false if this is a duplicate.
|
|
|
|
|
@ -147,7 +160,7 @@ class PacketQueue {
|
|
|
|
|
|
|
|
|
|
void RemoveFromDupeSet(const Packet& packet) {
|
|
|
|
|
SsrcSeqNoMap::iterator it = dupe_map_.find(packet.ssrc);
|
|
|
|
|
assert(it != dupe_map_.end());
|
|
|
|
|
RTC_DCHECK(it != dupe_map_.end());
|
|
|
|
|
it->second.erase(packet.sequence_number);
|
|
|
|
|
if (it->second.empty()) {
|
|
|
|
|
dupe_map_.erase(it);
|
|
|
|
|
@ -165,6 +178,9 @@ class PacketQueue {
|
|
|
|
|
// Map<ssrc, set<seq_no> >, for checking duplicates.
|
|
|
|
|
typedef std::map<uint32_t, std::set<uint16_t> > SsrcSeqNoMap;
|
|
|
|
|
SsrcSeqNoMap dupe_map_;
|
|
|
|
|
Clock* const clock_;
|
|
|
|
|
int64_t queue_time_sum_;
|
|
|
|
|
int64_t time_last_updated_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class IntervalBudget {
|
|
|
|
|
@ -209,6 +225,7 @@ class IntervalBudget {
|
|
|
|
|
};
|
|
|
|
|
} // namespace paced_sender
|
|
|
|
|
|
|
|
|
|
const int64_t PacedSender::kMaxQueueLengthMs = 2000;
|
|
|
|
|
const float PacedSender::kDefaultPaceMultiplier = 2.5f;
|
|
|
|
|
|
|
|
|
|
PacedSender::PacedSender(Clock* clock,
|
|
|
|
|
@ -225,8 +242,9 @@ PacedSender::PacedSender(Clock* clock,
|
|
|
|
|
padding_budget_(new paced_sender::IntervalBudget(min_bitrate_kbps)),
|
|
|
|
|
prober_(new BitrateProber()),
|
|
|
|
|
bitrate_bps_(1000 * bitrate_kbps),
|
|
|
|
|
max_bitrate_kbps_(max_bitrate_kbps),
|
|
|
|
|
time_last_update_us_(clock->TimeInMicroseconds()),
|
|
|
|
|
packets_(new paced_sender::PacketQueue()),
|
|
|
|
|
packets_(new paced_sender::PacketQueue(clock)),
|
|
|
|
|
packet_counter_(0) {
|
|
|
|
|
UpdateBytesPerInterval(kMinPacketLimitMs);
|
|
|
|
|
}
|
|
|
|
|
@ -244,7 +262,7 @@ void PacedSender::Resume() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PacedSender::SetProbingEnabled(bool enabled) {
|
|
|
|
|
assert(packet_counter_ == 0);
|
|
|
|
|
RTC_CHECK_EQ(0u, packet_counter_);
|
|
|
|
|
probing_enabled_ = enabled;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@ -252,9 +270,12 @@ void PacedSender::UpdateBitrate(int bitrate_kbps,
|
|
|
|
|
int max_bitrate_kbps,
|
|
|
|
|
int min_bitrate_kbps) {
|
|
|
|
|
CriticalSectionScoped cs(critsect_.get());
|
|
|
|
|
media_budget_->set_target_rate_kbps(max_bitrate_kbps);
|
|
|
|
|
// Don't set media bitrate here as it may be boosted in order to meet max
|
|
|
|
|
// queue time constraint. Just update max_bitrate_kbps_ and let media_budget_
|
|
|
|
|
// be updated in Process().
|
|
|
|
|
padding_budget_->set_target_rate_kbps(min_bitrate_kbps);
|
|
|
|
|
bitrate_bps_ = 1000 * bitrate_kbps;
|
|
|
|
|
max_bitrate_kbps_ = max_bitrate_kbps;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
|
|
|
|
@ -265,14 +286,12 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
|
|
|
|
bool retransmission) {
|
|
|
|
|
CriticalSectionScoped cs(critsect_.get());
|
|
|
|
|
|
|
|
|
|
if (probing_enabled_ && !prober_->IsProbing()) {
|
|
|
|
|
if (probing_enabled_ && !prober_->IsProbing())
|
|
|
|
|
prober_->SetEnabled(true);
|
|
|
|
|
}
|
|
|
|
|
prober_->MaybeInitializeProbe(bitrate_bps_);
|
|
|
|
|
|
|
|
|
|
if (capture_time_ms < 0) {
|
|
|
|
|
if (capture_time_ms < 0)
|
|
|
|
|
capture_time_ms = clock_->TimeInMilliseconds();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
packets_->Push(paced_sender::Packet(
|
|
|
|
|
priority, ssrc, sequence_number, capture_time_ms,
|
|
|
|
|
@ -281,9 +300,8 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
|
|
|
|
|
|
|
|
|
|
int64_t PacedSender::ExpectedQueueTimeMs() const {
|
|
|
|
|
CriticalSectionScoped cs(critsect_.get());
|
|
|
|
|
int target_rate = media_budget_->target_rate_kbps();
|
|
|
|
|
assert(target_rate > 0);
|
|
|
|
|
return static_cast<int64_t>(packets_->SizeInBytes() * 8 / target_rate);
|
|
|
|
|
RTC_DCHECK_GT(max_bitrate_kbps_, 0);
|
|
|
|
|
return static_cast<int64_t>(packets_->SizeInBytes() * 8 / max_bitrate_kbps_);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
size_t PacedSender::QueueSizePackets() const {
|
|
|
|
|
@ -294,7 +312,7 @@ size_t PacedSender::QueueSizePackets() const {
|
|
|
|
|
int64_t PacedSender::QueueInMs() const {
|
|
|
|
|
CriticalSectionScoped cs(critsect_.get());
|
|
|
|
|
|
|
|
|
|
int64_t oldest_packet = packets_->OldestEnqueueTime();
|
|
|
|
|
int64_t oldest_packet = packets_->OldestEnqueueTimeMs();
|
|
|
|
|
if (oldest_packet == 0)
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
@ -305,9 +323,8 @@ int64_t PacedSender::TimeUntilNextProcess() {
|
|
|
|
|
CriticalSectionScoped cs(critsect_.get());
|
|
|
|
|
if (prober_->IsProbing()) {
|
|
|
|
|
int64_t ret = prober_->TimeUntilNextProbe(clock_->TimeInMilliseconds());
|
|
|
|
|
if (ret >= 0) {
|
|
|
|
|
if (ret >= 0)
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
int64_t elapsed_time_us = clock_->TimeInMicroseconds() - time_last_update_us_;
|
|
|
|
|
int64_t elapsed_time_ms = (elapsed_time_us + 500) / 1000;
|
|
|
|
|
@ -321,14 +338,29 @@ int32_t PacedSender::Process() {
|
|
|
|
|
time_last_update_us_ = now_us;
|
|
|
|
|
if (paused_)
|
|
|
|
|
return 0;
|
|
|
|
|
int target_bitrate_kbps = max_bitrate_kbps_;
|
|
|
|
|
if (elapsed_time_ms > 0) {
|
|
|
|
|
size_t queue_size_bytes = packets_->SizeInBytes();
|
|
|
|
|
if (queue_size_bytes > 0) {
|
|
|
|
|
// Assuming equal size packets and input/output rate, the average packet
|
|
|
|
|
// has avg_time_left_ms left to get queue_size_bytes out of the queue, if
|
|
|
|
|
// time constraint shall be met. Determine bitrate needed for that.
|
|
|
|
|
int64_t avg_time_left_ms = std::max<int64_t>(
|
|
|
|
|
1, kMaxQueueLengthMs - packets_->AverageQueueTimeMs());
|
|
|
|
|
int min_bitrate_needed_kbps =
|
|
|
|
|
static_cast<int>(queue_size_bytes * 8 / avg_time_left_ms);
|
|
|
|
|
if (min_bitrate_needed_kbps > target_bitrate_kbps)
|
|
|
|
|
target_bitrate_kbps = min_bitrate_needed_kbps;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
media_budget_->set_target_rate_kbps(target_bitrate_kbps);
|
|
|
|
|
|
|
|
|
|
int64_t delta_time_ms = std::min(kMaxIntervalTimeMs, elapsed_time_ms);
|
|
|
|
|
UpdateBytesPerInterval(delta_time_ms);
|
|
|
|
|
}
|
|
|
|
|
while (!packets_->Empty()) {
|
|
|
|
|
if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing()) {
|
|
|
|
|
if (media_budget_->bytes_remaining() == 0 && !prober_->IsProbing())
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Since we need to release the lock in order to send, we first pop the
|
|
|
|
|
// element from the priority queue but keep it in storage, so that we can
|
|
|
|
|
@ -337,9 +369,8 @@ int32_t PacedSender::Process() {
|
|
|
|
|
if (SendPacket(packet)) {
|
|
|
|
|
// Send succeeded, remove it from the queue.
|
|
|
|
|
packets_->FinalizePop(packet);
|
|
|
|
|
if (prober_->IsProbing()) {
|
|
|
|
|
if (prober_->IsProbing())
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// Send failed, put it back into the queue.
|
|
|
|
|
packets_->CancelPop(packet);
|
|
|
|
|
@ -351,10 +382,11 @@ int32_t PacedSender::Process() {
|
|
|
|
|
return 0;
|
|
|
|
|
|
|
|
|
|
size_t padding_needed;
|
|
|
|
|
if (prober_->IsProbing())
|
|
|
|
|
if (prober_->IsProbing()) {
|
|
|
|
|
padding_needed = prober_->RecommendedPacketSize();
|
|
|
|
|
else
|
|
|
|
|
} else {
|
|
|
|
|
padding_needed = padding_budget_->bytes_remaining();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (padding_needed > 0)
|
|
|
|
|
SendPadding(static_cast<size_t>(padding_needed));
|
|
|
|
|
|