Add support within PacedSender and pacer queue for owning rtp packets.

This CL builds on https://webrtc-review.googlesource.com/c/src/+/142165
It adds the parts within the paced sender that uses those send methods.
A follow-up will add the pre-pacer RTP sender parts. That CL will also
add proper integration testing. Here, I mostly add coverage for the new
send methods. When the old code-path is removed, all tests need to be
converted to exclusively use the owned path.

Bug: webrtc:10633
Change-Id: I870d9a2285f07a7b7b0ef6758aa310808f210f28
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/142179
Commit-Queue: Erik Språng <sprang@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Sebastian Jansson <srte@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#28308}
This commit is contained in:
Erik Språng
2019-06-18 16:20:11 +02:00
committed by Commit Bot
parent b028c6a8ff
commit 58ee187554
10 changed files with 413 additions and 135 deletions

View File

@ -85,6 +85,7 @@ if (rtc_include_tests) {
"../rtp_rtcp", "../rtp_rtcp",
"../rtp_rtcp:mock_rtp_rtcp", "../rtp_rtcp:mock_rtp_rtcp",
"../rtp_rtcp:rtp_rtcp_format", "../rtp_rtcp:rtp_rtcp_format",
"//third_party/abseil-cpp/absl/memory",
] ]
} }

View File

@ -44,6 +44,27 @@ bool IsEnabled(const WebRtcKeyValueConfig& field_trials,
return field_trials.Lookup(key).find("Enabled") == 0; return field_trials.Lookup(key).find("Enabled") == 0;
} }
int GetPriorityForType(RtpPacketToSend::Type type) {
switch (type) {
case RtpPacketToSend::Type::kAudio:
// Audio is always prioritized over other packet types.
return 0;
case RtpPacketToSend::Type::kRetransmission:
// Send retransmissions before new media.
return 1;
case RtpPacketToSend::Type::kVideo:
// Video has "normal" priority, in the old speak.
return 2;
case RtpPacketToSend::Type::kForwardErrorCorrection:
// Redundancy is OK to drop, but the content is hopefully not useless.
return 3;
case RtpPacketToSend::Type::kPadding:
// Packets that are in themselves likely useless, only sent to keep the
// BWE high.
return 4;
}
}
} // namespace } // namespace
const int64_t PacedSender::kMaxQueueLengthMs = 2000; const int64_t PacedSender::kMaxQueueLengthMs = 2000;
const float PacedSender::kDefaultPaceMultiplier = 2.5f; const float PacedSender::kDefaultPaceMultiplier = 2.5f;
@ -186,9 +207,37 @@ void PacedSender::InsertPacket(RtpPacketSender::Priority priority,
if (capture_time_ms < 0) if (capture_time_ms < 0)
capture_time_ms = now_ms; capture_time_ms = now_ms;
packets_.Push(RoundRobinPacketQueue::Packet( RtpPacketToSend::Type type;
priority, ssrc, sequence_number, capture_time_ms, now_ms, bytes, switch (priority) {
retransmission, packet_counter_++)); case RtpPacketPacer::kHighPriority:
type = RtpPacketToSend::Type::kAudio;
break;
case RtpPacketPacer::kNormalPriority:
type = RtpPacketToSend::Type::kRetransmission;
break;
default:
type = RtpPacketToSend::Type::kVideo;
}
packets_.Push(GetPriorityForType(type), type, ssrc, sequence_number,
capture_time_ms, now_ms, bytes, retransmission,
packet_counter_++);
}
void PacedSender::EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) {
rtc::CritScope cs(&critsect_);
RTC_DCHECK(pacing_bitrate_kbps_ > 0)
<< "SetPacingRate must be called before InsertPacket.";
int64_t now_ms = TimeMilliseconds();
prober_.OnIncomingPacket(packet->payload_size());
if (packet->capture_time_ms() < 0) {
packet->set_capture_time_ms(now_ms);
}
RTC_CHECK(packet->packet_type());
int priority = GetPriorityForType(*packet->packet_type());
packets_.Push(priority, now_ms, packet_counter_++, std::move(packet));
} }
void PacedSender::SetAccountForAudioPackets(bool account_for_audio) { void PacedSender::SetAccountForAudioPackets(bool account_for_audio) {
@ -324,27 +373,43 @@ void PacedSender::Process() {
// The paused state is checked in the loop since it leaves the critical // The paused state is checked in the loop since it leaves the critical
// section allowing the paused state to be changed from other code. // section allowing the paused state to be changed from other code.
while (!packets_.Empty() && !paused_) { while (!packets_.Empty() && !paused_) {
const auto* packet = GetPendingPacket(pacing_info); auto* packet = GetPendingPacket(pacing_info);
if (packet == nullptr) if (packet == nullptr)
break; break;
std::unique_ptr<RtpPacketToSend> rtp_packet = packet->ReleasePacket();
const bool owned_rtp_packet = rtp_packet != nullptr;
critsect_.Leave(); critsect_.Leave();
RtpPacketSendResult success = packet_router_->TimeToSendPacket(
packet->ssrc, packet->sequence_number, packet->capture_time_ms, RtpPacketSendResult success;
packet->retransmission, pacing_info); if (rtp_packet != nullptr) {
packet_router_->SendPacket(std::move(rtp_packet), pacing_info);
success = RtpPacketSendResult::kSuccess;
} else {
success = packet_router_->TimeToSendPacket(
packet->ssrc(), packet->sequence_number(), packet->capture_time_ms(),
packet->is_retransmission(), pacing_info);
}
critsect_.Enter(); critsect_.Enter();
if (success == RtpPacketSendResult::kSuccess || if (success == RtpPacketSendResult::kSuccess ||
success == RtpPacketSendResult::kPacketNotFound) { success == RtpPacketSendResult::kPacketNotFound) {
// Packet sent or invalid packet, remove it from queue. // Packet sent or invalid packet, remove it from queue.
// TODO(webrtc:8052): Don't consume media budget on kInvalid. // TODO(webrtc:8052): Don't consume media budget on kInvalid.
bytes_sent += packet->bytes; bytes_sent += packet->size_in_bytes();
// Send succeeded, remove it from the queue. // Send succeeded, remove it from the queue.
OnPacketSent(packet); OnPacketSent(packet);
if (is_probing && bytes_sent > recommended_probe_size) if (is_probing && bytes_sent > recommended_probe_size)
break; break;
} else if (owned_rtp_packet) {
// Send failed, but we can't put it back in the queue, remove it without
// consuming budget.
packets_.FinalizePop();
break;
} else { } else {
// Send failed, put it back into the queue. // Send failed, put it back into the queue.
packets_.CancelPop(*packet); packets_.CancelPop();
break; break;
} }
} }
@ -379,34 +444,34 @@ void PacedSender::ProcessThreadAttached(ProcessThread* process_thread) {
process_thread_ = process_thread; process_thread_ = process_thread;
} }
const RoundRobinPacketQueue::Packet* PacedSender::GetPendingPacket( RoundRobinPacketQueue::QueuedPacket* PacedSender::GetPendingPacket(
const PacedPacketInfo& pacing_info) { const PacedPacketInfo& pacing_info) {
// Since we need to release the lock in order to send, we first pop the // Since we need to release the lock in order to send, we first pop the
// element from the priority queue but keep it in storage, so that we can // element from the priority queue but keep it in storage, so that we can
// reinsert it if send fails. // reinsert it if send fails.
const RoundRobinPacketQueue::Packet* packet = &packets_.BeginPop(); RoundRobinPacketQueue::QueuedPacket* packet = packets_.BeginPop();
bool audio_packet = packet->priority == kHighPriority; bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
bool apply_pacing = !audio_packet || pace_audio_; bool apply_pacing = !audio_packet || pace_audio_;
if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 && if (apply_pacing && (Congested() || (media_budget_.bytes_remaining() == 0 &&
pacing_info.probe_cluster_id == pacing_info.probe_cluster_id ==
PacedPacketInfo::kNotAProbe))) { PacedPacketInfo::kNotAProbe))) {
packets_.CancelPop(*packet); packets_.CancelPop();
return nullptr; return nullptr;
} }
return packet; return packet;
} }
void PacedSender::OnPacketSent(const RoundRobinPacketQueue::Packet* packet) { void PacedSender::OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet) {
if (first_sent_packet_ms_ == -1) if (first_sent_packet_ms_ == -1)
first_sent_packet_ms_ = TimeMilliseconds(); first_sent_packet_ms_ = TimeMilliseconds();
bool audio_packet = packet->priority == kHighPriority; bool audio_packet = packet->type() == RtpPacketToSend::Type::kAudio;
if (!audio_packet || account_for_audio_) { if (!audio_packet || account_for_audio_) {
// Update media bytes sent. // Update media bytes sent.
UpdateBudgetWithBytesSent(packet->bytes); UpdateBudgetWithBytesSent(packet->size_in_bytes());
last_send_time_us_ = clock_->TimeInMicroseconds(); last_send_time_us_ = clock_->TimeInMicroseconds();
} }
// Send succeeded, remove it from the queue. // Send succeeded, remove it from the queue.
packets_.FinalizePop(*packet); packets_.FinalizePop();
} }
void PacedSender::OnPaddingSent(size_t bytes_sent) { void PacedSender::OnPaddingSent(size_t bytes_sent) {

View File

@ -25,7 +25,9 @@
#include "modules/pacing/interval_budget.h" #include "modules/pacing/interval_budget.h"
#include "modules/pacing/packet_router.h" #include "modules/pacing/packet_router.h"
#include "modules/pacing/round_robin_packet_queue.h" #include "modules/pacing/round_robin_packet_queue.h"
#include "modules/rtp_rtcp/include/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/utility/include/process_thread.h" #include "modules/utility/include/process_thread.h"
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/experiments/field_trial_parser.h" #include "rtc_base/experiments/field_trial_parser.h"
@ -35,7 +37,7 @@ namespace webrtc {
class Clock; class Clock;
class RtcEventLog; class RtcEventLog;
class PacedSender : public Module, public RtpPacketSender { class PacedSender : public Module, public RtpPacketPacer {
public: public:
static constexpr int64_t kNoCongestionWindow = -1; static constexpr int64_t kNoCongestionWindow = -1;
@ -77,8 +79,8 @@ class PacedSender : public Module, public RtpPacketSender {
// Sets the pacing rates. Must be called once before packets can be sent. // Sets the pacing rates. Must be called once before packets can be sent.
void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps); void SetPacingRates(uint32_t pacing_rate_bps, uint32_t padding_rate_bps);
// Returns true if we send the packet now, else it will add the packet // Adds the packet information to the queue and calls TimeToSendPacket
// information to the queue and call TimeToSendPacket when it's time to send. // when it's time to send.
void InsertPacket(RtpPacketSender::Priority priority, void InsertPacket(RtpPacketSender::Priority priority,
uint32_t ssrc, uint32_t ssrc,
uint16_t sequence_number, uint16_t sequence_number,
@ -86,6 +88,10 @@ class PacedSender : public Module, public RtpPacketSender {
size_t bytes, size_t bytes,
bool retransmission) override; bool retransmission) override;
// Adds the packet to the queue and calls PacketRouter::SendPacket() when
// it's time to send.
void EnqueuePacket(std::unique_ptr<RtpPacketToSend> packet) override;
// Currently audio traffic is not accounted by pacer and passed through. // Currently audio traffic is not accounted by pacer and passed through.
// With the introduction of audio BWE audio traffic will be accounted for // With the introduction of audio BWE audio traffic will be accounted for
// the pacer budget calculation. The audio traffic still will be injected // the pacer budget calculation. The audio traffic still will be injected
@ -129,10 +135,10 @@ class PacedSender : public Module, public RtpPacketSender {
void UpdateBudgetWithBytesSent(size_t bytes) void UpdateBudgetWithBytesSent(size_t bytes)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
const RoundRobinPacketQueue::Packet* GetPendingPacket( RoundRobinPacketQueue::QueuedPacket* GetPendingPacket(
const PacedPacketInfo& pacing_info) const PacedPacketInfo& pacing_info)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPacketSent(const RoundRobinPacketQueue::Packet* packet) void OnPacketSent(RoundRobinPacketQueue::QueuedPacket* packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);
void OnPaddingSent(size_t padding_sent) void OnPaddingSent(size_t padding_sent)
RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_); RTC_EXCLUSIVE_LOCKS_REQUIRED(critsect_);

View File

@ -12,6 +12,7 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include "absl/memory/memory.h"
#include "modules/pacing/paced_sender.h" #include "modules/pacing/paced_sender.h"
#include "modules/pacing/packet_router.h" #include "modules/pacing/packet_router.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
@ -22,6 +23,8 @@
using ::testing::_; using ::testing::_;
using ::testing::Field; using ::testing::Field;
using ::testing::Pointee;
using ::testing::Property;
using ::testing::Return; using ::testing::Return;
namespace { namespace {
@ -34,6 +37,11 @@ constexpr unsigned kSecondClusterBps = 1800000;
constexpr int kBitrateProbingError = 150000; constexpr int kBitrateProbingError = 150000;
const float kPaceMultiplier = 2.5f; const float kPaceMultiplier = 2.5f;
constexpr uint32_t kAudioSsrc = 12345;
constexpr uint32_t kVideoSsrc = 234565;
constexpr uint32_t kVideoRtxSsrc = 34567;
constexpr uint32_t kFlexFecSsrc = 45678;
} // namespace } // namespace
namespace webrtc { namespace webrtc {
@ -49,6 +57,9 @@ class MockPacedSenderCallback : public PacketRouter {
int64_t capture_time_ms, int64_t capture_time_ms,
bool retransmission, bool retransmission,
const PacedPacketInfo& pacing_info)); const PacedPacketInfo& pacing_info));
MOCK_METHOD2(SendPacket,
void(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& pacing_info));
MOCK_METHOD2(TimeToSendPadding, MOCK_METHOD2(TimeToSendPadding,
size_t(size_t bytes, const PacedPacketInfo& pacing_info)); size_t(size_t bytes, const PacedPacketInfo& pacing_info));
}; };
@ -139,6 +150,30 @@ class PacedSenderTest : public ::testing::TestWithParam<std::string> {
.Times(1) .Times(1)
.WillRepeatedly(Return(RtpPacketSendResult::kSuccess)); .WillRepeatedly(Return(RtpPacketSendResult::kSuccess));
} }
std::unique_ptr<RtpPacketToSend> BuildRtpPacket(RtpPacketToSend::Type type) {
auto packet = absl::make_unique<RtpPacketToSend>(nullptr);
packet->set_packet_type(type);
switch (type) {
case RtpPacketToSend::Type::kAudio:
packet->SetSsrc(kAudioSsrc);
break;
case RtpPacketToSend::Type::kVideo:
packet->SetSsrc(kVideoSsrc);
break;
case RtpPacketToSend::Type::kRetransmission:
case RtpPacketToSend::Type::kPadding:
packet->SetSsrc(kVideoRtxSsrc);
break;
case RtpPacketToSend::Type::kForwardErrorCorrection:
packet->SetSsrc(kFlexFecSsrc);
break;
}
packet->SetPayloadSize(234);
return packet;
}
SimulatedClock clock_; SimulatedClock clock_;
MockPacedSenderCallback callback_; MockPacedSenderCallback callback_;
std::unique_ptr<PacedSender> send_bucket_; std::unique_ptr<PacedSender> send_bucket_;
@ -1292,6 +1327,39 @@ TEST_F(PacedSenderTest, AvoidBusyLoopOnSendFailure) {
EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess()); EXPECT_EQ(5, send_bucket_->TimeUntilNextProcess());
} }
TEST_F(PacedSenderTest, OwnedPacketPrioritizedOnType) {
// Insert a packet of each type, from low to high priority. Since priority
// is weighted higher than insert order, these should come out of the pacer
// in backwards order.
for (RtpPacketToSend::Type type :
{RtpPacketToSend::Type::kPadding,
RtpPacketToSend::Type::kForwardErrorCorrection,
RtpPacketToSend::Type::kVideo, RtpPacketToSend::Type::kRetransmission,
RtpPacketToSend::Type::kAudio}) {
send_bucket_->EnqueuePacket(BuildRtpPacket(type));
}
::testing::InSequence seq;
EXPECT_CALL(
callback_,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kAudioSsrc)), _));
EXPECT_CALL(
callback_,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
EXPECT_CALL(
callback_,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoSsrc)), _));
EXPECT_CALL(
callback_,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kFlexFecSsrc)), _));
EXPECT_CALL(
callback_,
SendPacket(Pointee(Property(&RtpPacketToSend::Ssrc, kVideoRtxSsrc)), _));
clock_.AdvanceTimeMilliseconds(200);
send_bucket_->Process();
}
// TODO(philipel): Move to PacketQueue2 unittests. // TODO(philipel): Move to PacketQueue2 unittests.
#if 0 #if 0
TEST_F(PacedSenderTest, QueueTimeWithPause) { TEST_F(PacedSenderTest, QueueTimeWithPause) {

View File

@ -13,6 +13,7 @@
#include <algorithm> #include <algorithm>
#include <cstdint> #include <cstdint>
#include <limits> #include <limits>
#include <utility>
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "modules/rtp_rtcp/include/rtp_rtcp.h" #include "modules/rtp_rtcp/include/rtp_rtcp.h"
@ -20,6 +21,7 @@
#include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h" #include "modules/rtp_rtcp/source/rtcp_packet/transport_feedback.h"
#include "rtc_base/atomic_ops.h" #include "rtc_base/atomic_ops.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/logging.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
namespace webrtc { namespace webrtc {
@ -125,6 +127,29 @@ RtpPacketSendResult PacketRouter::TimeToSendPacket(
return RtpPacketSendResult::kPacketNotFound; return RtpPacketSendResult::kPacketNotFound;
} }
void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info) {
rtc::CritScope cs(&modules_crit_);
for (auto* rtp_module : rtp_send_modules_) {
if (rtp_module->TrySendPacket(packet.get(), cluster_info)) {
const bool can_send_padding =
(rtp_module->RtxSendStatus() & kRtxRedundantPayloads) &&
rtp_module->HasBweExtensions();
if (can_send_padding) {
// This is now the last module to send media, and has the desired
// properties needed for payload based padding. Cache it for later use.
last_send_module_ = rtp_module;
}
return;
}
}
RTC_LOG(LS_WARNING) << "Failed to send packet, matching RTP module not found "
"or transport error. SSRC = "
<< packet->Ssrc() << ", sequence number "
<< packet->SequenceNumber();
}
size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send, size_t PacketRouter::TimeToSendPadding(size_t bytes_to_send,
const PacedPacketInfo& pacing_info) { const PacedPacketInfo& pacing_info) {
size_t total_bytes_sent = 0; size_t total_bytes_sent = 0;

View File

@ -14,11 +14,13 @@
#include <stddef.h> #include <stddef.h>
#include <stdint.h> #include <stdint.h>
#include <list> #include <list>
#include <memory>
#include <vector> #include <vector>
#include "api/transport/network_types.h" #include "api/transport/network_types.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h" #include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -30,7 +32,7 @@ namespace rtcp {
class TransportFeedback; class TransportFeedback;
} // namespace rtcp } // namespace rtcp
// PacketRouter keeps track of RTP send modules to support the pacer. // PacketRouter keeps track of rtp send modules to support the pacer.
// In addition, it handles feedback messages, which are sent on a send // In addition, it handles feedback messages, which are sent on a send
// module if possible (sender report), otherwise on receive module // module if possible (sender report), otherwise on receive module
// (receiver report). For the latter case, we also keep track of the // (receiver report). For the latter case, we also keep track of the
@ -56,6 +58,9 @@ class PacketRouter : public TransportSequenceNumberAllocator,
bool retransmission, bool retransmission,
const PacedPacketInfo& packet_info); const PacedPacketInfo& packet_info);
virtual void SendPacket(std::unique_ptr<RtpPacketToSend> packet,
const PacedPacketInfo& cluster_info);
virtual size_t TimeToSendPadding(size_t bytes, virtual size_t TimeToSendPadding(size_t bytes,
const PacedPacketInfo& packet_info); const PacedPacketInfo& packet_info);

View File

@ -18,36 +18,53 @@
namespace webrtc { namespace webrtc {
RoundRobinPacketQueue::Packet::Packet(RtpPacketSender::Priority priority, RoundRobinPacketQueue::QueuedPacket::QueuedPacket(const QueuedPacket& rhs) =
default;
RoundRobinPacketQueue::QueuedPacket::~QueuedPacket() = default;
RoundRobinPacketQueue::QueuedPacket::QueuedPacket(
int priority,
RtpPacketToSend::Type type,
uint32_t ssrc, uint32_t ssrc,
uint16_t seq_number, uint16_t seq_number,
int64_t capture_time_ms, int64_t capture_time_ms,
int64_t enqueue_time_ms, int64_t enqueue_time_ms,
size_t length_in_bytes, size_t length_in_bytes,
bool retransmission, bool retransmission,
uint64_t enqueue_order) uint64_t enqueue_order,
: priority(priority), std::multiset<int64_t>::iterator enqueue_time_it,
ssrc(ssrc), absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
sequence_number(seq_number), packet_it)
capture_time_ms(capture_time_ms), : type_(type),
enqueue_time_ms(enqueue_time_ms), priority_(priority),
sum_paused_ms(0), ssrc_(ssrc),
bytes(length_in_bytes), sequence_number_(seq_number),
retransmission(retransmission), capture_time_ms_(capture_time_ms),
enqueue_order(enqueue_order) {} enqueue_time_ms_(enqueue_time_ms),
bytes_(length_in_bytes),
retransmission_(retransmission),
enqueue_order_(enqueue_order),
enqueue_time_it_(enqueue_time_it),
packet_it_(packet_it) {}
RoundRobinPacketQueue::Packet::Packet(const Packet& other) = default; std::unique_ptr<RtpPacketToSend>
RoundRobinPacketQueue::QueuedPacket::ReleasePacket() {
return packet_it_ ? std::move(**packet_it_) : nullptr;
}
RoundRobinPacketQueue::Packet::~Packet() {} void RoundRobinPacketQueue::QueuedPacket::SubtractPauseTimeMs(
int64_t pause_time_sum_ms) {
enqueue_time_ms_ -= pause_time_sum_ms;
}
bool RoundRobinPacketQueue::Packet::operator<( bool RoundRobinPacketQueue::QueuedPacket::operator<(
const RoundRobinPacketQueue::Packet& other) const { const RoundRobinPacketQueue::QueuedPacket& other) const {
if (priority != other.priority) if (priority_ != other.priority_)
return priority > other.priority; return priority_ > other.priority_;
if (retransmission != other.retransmission) if (retransmission_ != other.retransmission_)
return other.retransmission; return other.retransmission_;
return enqueue_order > other.enqueue_order; return enqueue_order_ > other.enqueue_order_;
} }
RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {} RoundRobinPacketQueue::Stream::Stream() : bytes(0), ssrc(0) {}
@ -59,50 +76,41 @@ RoundRobinPacketQueue::RoundRobinPacketQueue(int64_t start_time_us)
RoundRobinPacketQueue::~RoundRobinPacketQueue() {} RoundRobinPacketQueue::~RoundRobinPacketQueue() {}
void RoundRobinPacketQueue::Push(const Packet& packet_to_insert) { void RoundRobinPacketQueue::Push(int priority,
Packet packet(packet_to_insert); RtpPacketToSend::Type type,
uint32_t ssrc,
auto stream_info_it = streams_.find(packet.ssrc); uint16_t seq_number,
if (stream_info_it == streams_.end()) { int64_t capture_time_ms,
stream_info_it = streams_.emplace(packet.ssrc, Stream()).first; int64_t enqueue_time_ms,
stream_info_it->second.priority_it = stream_priorities_.end(); size_t length_in_bytes,
stream_info_it->second.ssrc = packet.ssrc; bool retransmission,
} uint64_t enqueue_order) {
Push(QueuedPacket(priority, type, ssrc, seq_number, capture_time_ms,
Stream* stream = &stream_info_it->second; enqueue_time_ms, length_in_bytes, retransmission,
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
if (stream->priority_it == stream_priorities_.end()) { absl::nullopt));
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
} else if (packet.priority < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that
// RtpPacketSender::Priority uses lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority, stream->bytes), packet.ssrc);
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
packet.enqueue_time_it = enqueue_times_.insert(packet.enqueue_time_ms);
// In order to figure out how much time a packet has spent in the queue while
// not in a paused state, we subtract the total amount of time the queue has
// been paused so far, and when the packet is poped we subtract the total
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
UpdateQueueTime(packet.enqueue_time_ms);
packet.enqueue_time_ms -= pause_time_sum_ms_;
stream->packet_queue.push(packet);
size_packets_ += 1;
size_bytes_ += packet.bytes;
} }
const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() { void RoundRobinPacketQueue::Push(int priority,
int64_t enqueue_time_ms,
uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet) {
uint32_t ssrc = packet->Ssrc();
uint16_t sequence_number = packet->SequenceNumber();
int64_t capture_time_ms = packet->capture_time_ms();
size_t size_bytes = packet->payload_size();
auto type = packet->packet_type();
RTC_DCHECK(type.has_value());
rtp_packets_.push_front(std::move(packet));
Push(QueuedPacket(priority, *type, ssrc, sequence_number, capture_time_ms,
enqueue_time_ms, size_bytes,
*type == RtpPacketToSend::Type::kRetransmission,
enqueue_order, enqueue_times_.insert(enqueue_time_ms),
rtp_packets_.begin()));
}
RoundRobinPacketQueue::QueuedPacket* RoundRobinPacketQueue::BeginPop() {
RTC_CHECK(!pop_packet_ && !pop_stream_); RTC_CHECK(!pop_packet_ && !pop_stream_);
Stream* stream = GetHighestPriorityStream(); Stream* stream = GetHighestPriorityStream();
@ -110,22 +118,22 @@ const RoundRobinPacketQueue::Packet& RoundRobinPacketQueue::BeginPop() {
pop_packet_.emplace(stream->packet_queue.top()); pop_packet_.emplace(stream->packet_queue.top());
stream->packet_queue.pop(); stream->packet_queue.pop();
return *pop_packet_; return &pop_packet_.value();
} }
void RoundRobinPacketQueue::CancelPop(const Packet& packet) { void RoundRobinPacketQueue::CancelPop() {
RTC_CHECK(pop_packet_ && pop_stream_); RTC_CHECK(pop_packet_ && pop_stream_);
(*pop_stream_)->packet_queue.push(*pop_packet_); (*pop_stream_)->packet_queue.push(*pop_packet_);
pop_packet_.reset(); pop_packet_.reset();
pop_stream_.reset(); pop_stream_.reset();
} }
void RoundRobinPacketQueue::FinalizePop(const Packet& packet) { void RoundRobinPacketQueue::FinalizePop() {
if (!Empty()) { if (!Empty()) {
RTC_CHECK(pop_packet_ && pop_stream_); RTC_CHECK(pop_packet_ && pop_stream_);
Stream* stream = *pop_stream_; Stream* stream = *pop_stream_;
stream_priorities_.erase(stream->priority_it); stream_priorities_.erase(stream->priority_it);
const Packet& packet = *pop_packet_; const QueuedPacket& packet = *pop_packet_;
// Calculate the total amount of time spent by this packet in the queue // Calculate the total amount of time spent by this packet in the queue
// while in a non-paused state. Note that the |pause_time_sum_ms_| was // while in a non-paused state. Note that the |pause_time_sum_ms_| was
@ -133,11 +141,16 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
// by subtracting it now we effectively remove the time spent in in the // by subtracting it now we effectively remove the time spent in in the
// queue while in a paused state. // queue while in a paused state.
int64_t time_in_non_paused_state_ms = int64_t time_in_non_paused_state_ms =
time_last_updated_ms_ - packet.enqueue_time_ms - pause_time_sum_ms_; time_last_updated_ms_ - packet.enqueue_time_ms() - pause_time_sum_ms_;
queue_time_sum_ms_ -= time_in_non_paused_state_ms; queue_time_sum_ms_ -= time_in_non_paused_state_ms;
RTC_CHECK(packet.enqueue_time_it != enqueue_times_.end()); RTC_CHECK(packet.EnqueueTimeIterator() != enqueue_times_.end());
enqueue_times_.erase(packet.enqueue_time_it); enqueue_times_.erase(packet.EnqueueTimeIterator());
auto packet_it = packet.PacketIterator();
if (packet_it) {
rtp_packets_.erase(*packet_it);
}
// Update |bytes| of this stream. The general idea is that the stream that // Update |bytes| of this stream. The general idea is that the stream that
// has sent the least amount of bytes should have the highest priority. // has sent the least amount of bytes should have the highest priority.
@ -145,11 +158,11 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
// case a "budget" will be built up for the stream sending at the lower // case a "budget" will be built up for the stream sending at the lower
// rate. To avoid building a too large budget we limit |bytes| to be within // rate. To avoid building a too large budget we limit |bytes| to be within
// kMaxLeading bytes of the stream that has sent the most amount of bytes. // kMaxLeading bytes of the stream that has sent the most amount of bytes.
stream->bytes = stream->bytes = std::max(stream->bytes + packet.size_in_bytes(),
std::max(stream->bytes + packet.bytes, max_bytes_ - kMaxLeadingBytes); max_bytes_ - kMaxLeadingBytes);
max_bytes_ = std::max(max_bytes_, stream->bytes); max_bytes_ = std::max(max_bytes_, stream->bytes);
size_bytes_ -= packet.bytes; size_bytes_ -= packet.size_in_bytes();
size_packets_ -= 1; size_packets_ -= 1;
RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0); RTC_CHECK(size_packets_ > 0 || queue_time_sum_ms_ == 0);
@ -158,7 +171,7 @@ void RoundRobinPacketQueue::FinalizePop(const Packet& packet) {
if (stream->packet_queue.empty()) { if (stream->packet_queue.empty()) {
stream->priority_it = stream_priorities_.end(); stream->priority_it = stream_priorities_.end();
} else { } else {
RtpPacketSender::Priority priority = stream->packet_queue.top().priority; int priority = stream->packet_queue.top().priority();
stream->priority_it = stream_priorities_.emplace( stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(priority, stream->bytes), stream->ssrc); StreamPrioKey(priority, stream->bytes), stream->ssrc);
} }
@ -218,6 +231,46 @@ int64_t RoundRobinPacketQueue::AverageQueueTimeMs() const {
return queue_time_sum_ms_ / size_packets_; return queue_time_sum_ms_ / size_packets_;
} }
void RoundRobinPacketQueue::Push(QueuedPacket packet) {
auto stream_info_it = streams_.find(packet.ssrc());
if (stream_info_it == streams_.end()) {
stream_info_it = streams_.emplace(packet.ssrc(), Stream()).first;
stream_info_it->second.priority_it = stream_priorities_.end();
stream_info_it->second.ssrc = packet.ssrc();
}
Stream* stream = &stream_info_it->second;
if (stream->priority_it == stream_priorities_.end()) {
// If the SSRC is not currently scheduled, add it to |stream_priorities_|.
RTC_CHECK(!IsSsrcScheduled(stream->ssrc));
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
} else if (packet.priority() < stream->priority_it->first.priority) {
// If the priority of this SSRC increased, remove the outdated StreamPrioKey
// and insert a new one with the new priority. Note that |priority_| uses
// lower ordinal for higher priority.
stream_priorities_.erase(stream->priority_it);
stream->priority_it = stream_priorities_.emplace(
StreamPrioKey(packet.priority(), stream->bytes), packet.ssrc());
}
RTC_CHECK(stream->priority_it != stream_priorities_.end());
// In order to figure out how much time a packet has spent in the queue while
// not in a paused state, we subtract the total amount of time the queue has
// been paused so far, and when the packet is popped we subtract the total
// amount of time the queue has been paused at that moment. This way we
// subtract the total amount of time the packet has spent in the queue while
// in a paused state.
UpdateQueueTime(packet.enqueue_time_ms());
packet.SubtractPauseTimeMs(pause_time_sum_ms_);
size_packets_ += 1;
size_bytes_ += packet.size_in_bytes();
stream->packet_queue.push(packet);
}
RoundRobinPacketQueue::Stream* RoundRobinPacketQueue::Stream*
RoundRobinPacketQueue::GetHighestPriorityStream() { RoundRobinPacketQueue::GetHighestPriorityStream() {
RTC_CHECK(!stream_priorities_.empty()); RTC_CHECK(!stream_priorities_.empty());

View File

@ -15,11 +15,13 @@
#include <stdint.h> #include <stdint.h>
#include <list> #include <list>
#include <map> #include <map>
#include <memory>
#include <queue> #include <queue>
#include <set> #include <set>
#include "absl/types/optional.h" #include "absl/types/optional.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "system_wrappers/include/clock.h" #include "system_wrappers/include/clock.h"
namespace webrtc { namespace webrtc {
@ -29,8 +31,66 @@ class RoundRobinPacketQueue {
explicit RoundRobinPacketQueue(int64_t start_time_us); explicit RoundRobinPacketQueue(int64_t start_time_us);
~RoundRobinPacketQueue(); ~RoundRobinPacketQueue();
struct Packet { struct QueuedPacket {
Packet(RtpPacketSender::Priority priority, public:
QueuedPacket(
int priority,
RtpPacketToSend::Type type,
uint32_t ssrc,
uint16_t seq_number,
int64_t capture_time_ms,
int64_t enqueue_time_ms,
size_t length_in_bytes,
bool retransmission,
uint64_t enqueue_order,
std::multiset<int64_t>::iterator enqueue_time_it,
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it);
QueuedPacket(const QueuedPacket& rhs);
~QueuedPacket();
bool operator<(const QueuedPacket& other) const;
int priority() const { return priority_; }
RtpPacketToSend::Type type() const { return type_; }
uint32_t ssrc() const { return ssrc_; }
uint16_t sequence_number() const { return sequence_number_; }
int64_t capture_time_ms() const { return capture_time_ms_; }
int64_t enqueue_time_ms() const { return enqueue_time_ms_; }
size_t size_in_bytes() const { return bytes_; }
bool is_retransmission() const { return retransmission_; }
uint64_t enqueue_order() const { return enqueue_order_; }
std::unique_ptr<RtpPacketToSend> ReleasePacket();
// For internal use.
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
PacketIterator() const {
return packet_it_;
}
std::multiset<int64_t>::iterator EnqueueTimeIterator() const {
return enqueue_time_it_;
}
void SubtractPauseTimeMs(int64_t pause_time_sum_ms);
private:
RtpPacketToSend::Type type_;
int priority_;
uint32_t ssrc_;
uint16_t sequence_number_;
int64_t capture_time_ms_; // Absolute time of frame capture.
int64_t enqueue_time_ms_; // Absolute time of pacer queue entry.
size_t bytes_;
bool retransmission_;
uint64_t enqueue_order_;
std::multiset<int64_t>::iterator enqueue_time_it_;
// Iterator into |rtp_packets_| where the memory for RtpPacket is owned,
// if applicable.
absl::optional<std::list<std::unique_ptr<RtpPacketToSend>>::iterator>
packet_it_;
};
void Push(int priority,
RtpPacketToSend::Type type,
uint32_t ssrc, uint32_t ssrc,
uint16_t seq_number, uint16_t seq_number,
int64_t capture_time_ms, int64_t capture_time_ms,
@ -38,27 +98,13 @@ class RoundRobinPacketQueue {
size_t length_in_bytes, size_t length_in_bytes,
bool retransmission, bool retransmission,
uint64_t enqueue_order); uint64_t enqueue_order);
Packet(const Packet& other); void Push(int priority,
virtual ~Packet(); int64_t enqueue_time_ms,
bool operator<(const Packet& other) const; uint64_t enqueue_order,
std::unique_ptr<RtpPacketToSend> packet);
RtpPacketSender::Priority priority; QueuedPacket* BeginPop();
uint32_t ssrc; void CancelPop();
uint16_t sequence_number; void FinalizePop();
int64_t capture_time_ms; // Absolute time of frame capture.
int64_t enqueue_time_ms; // Absolute time of pacer queue entry.
int64_t sum_paused_ms;
size_t bytes;
bool retransmission;
uint64_t enqueue_order;
std::list<Packet>::iterator this_it;
std::multiset<int64_t>::iterator enqueue_time_it;
};
void Push(const Packet& packet);
const Packet& BeginPop();
void CancelPop(const Packet& packet);
void FinalizePop(const Packet& packet);
bool Empty() const; bool Empty() const;
size_t SizeInPackets() const; size_t SizeInPackets() const;
@ -71,7 +117,7 @@ class RoundRobinPacketQueue {
private: private:
struct StreamPrioKey { struct StreamPrioKey {
StreamPrioKey(RtpPacketSender::Priority priority, int64_t bytes) StreamPrioKey(int priority, int64_t bytes)
: priority(priority), bytes(bytes) {} : priority(priority), bytes(bytes) {}
bool operator<(const StreamPrioKey& other) const { bool operator<(const StreamPrioKey& other) const {
@ -80,7 +126,7 @@ class RoundRobinPacketQueue {
return bytes < other.bytes; return bytes < other.bytes;
} }
const RtpPacketSender::Priority priority; const int priority;
const size_t bytes; const size_t bytes;
}; };
@ -92,7 +138,7 @@ class RoundRobinPacketQueue {
size_t bytes; size_t bytes;
uint32_t ssrc; uint32_t ssrc;
std::priority_queue<Packet> packet_queue; std::priority_queue<QueuedPacket> packet_queue;
// Whenever a packet is inserted for this stream we check if |priority_it| // Whenever a packet is inserted for this stream we check if |priority_it|
// points to an element in |stream_priorities_|, and if it does it means // points to an element in |stream_priorities_|, and if it does it means
@ -104,13 +150,15 @@ class RoundRobinPacketQueue {
static constexpr size_t kMaxLeadingBytes = 1400; static constexpr size_t kMaxLeadingBytes = 1400;
void Push(QueuedPacket packet);
Stream* GetHighestPriorityStream(); Stream* GetHighestPriorityStream();
// Just used to verify correctness. // Just used to verify correctness.
bool IsSsrcScheduled(uint32_t ssrc) const; bool IsSsrcScheduled(uint32_t ssrc) const;
int64_t time_last_updated_ms_; int64_t time_last_updated_ms_;
absl::optional<Packet> pop_packet_; absl::optional<QueuedPacket> pop_packet_;
absl::optional<Stream*> pop_stream_; absl::optional<Stream*> pop_stream_;
bool paused_ = false; bool paused_ = false;
@ -132,6 +180,12 @@ class RoundRobinPacketQueue {
// The enqueue time of every packet currently in the queue. Used to figure out // The enqueue time of every packet currently in the queue. Used to figure out
// the age of the oldest packet in the queue. // the age of the oldest packet in the queue.
std::multiset<int64_t> enqueue_times_; std::multiset<int64_t> enqueue_times_;
// List of RTP packets to be sent, not necessarily in the order they will be
// sent. PacketInfo.packet_it will point to an entry in this list, or the
// end iterator of this list if queue does not have direct ownership of the
// packet.
std::list<std::unique_ptr<RtpPacketToSend>> rtp_packets_;
}; };
} // namespace webrtc } // namespace webrtc

View File

@ -23,10 +23,10 @@ namespace webrtc {
// TODO(bugs.webrtc.org/10633): Add things missing to this interface so that we // TODO(bugs.webrtc.org/10633): Add things missing to this interface so that we
// can use multiple different pacer implementations, and stop inheriting from // can use multiple different pacer implementations, and stop inheriting from
// RtpPacketSender. // RtpPacketSender.
class RtpPacketPacer : RtpPacketSender { class RtpPacketPacer : public RtpPacketSender {
public: public:
RtpPacketPacer() = default; RtpPacketPacer() = default;
~RtpPacketPacer() override; ~RtpPacketPacer() override = default;
// Insert packet into queue, for eventual transmission. Based on the type of // Insert packet into queue, for eventual transmission. Based on the type of
// the packet, it will prioritized and scheduled relative to other packets and // the packet, it will prioritized and scheduled relative to other packets and

View File

@ -26,6 +26,7 @@
#include "modules/rtp_rtcp/include/receive_statistics.h" #include "modules/rtp_rtcp/include/receive_statistics.h"
#include "modules/rtp_rtcp/include/report_block_data.h" #include "modules/rtp_rtcp/include/report_block_data.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h" #include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "modules/rtp_rtcp/source/rtp_sender.h" #include "modules/rtp_rtcp/source/rtp_sender.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/deprecation.h" #include "rtc_base/deprecation.h"