Remove a timer from ModuleRtpRtcpImpl2 that runs 100 times a second.

The timer fired a Notify call that goes to an object that already
receives callbacks for every packet from RtpSenderEgress.

Further optimizations will be realized by moving ownership
of the stats to the worker thread and then be able to remove
locking in a few classes that currently are tied to those
variables and the callbacks that previously did not come
from the same thread consistently.

We could furthermore get rid of one of these callback interfaces
and just use one.

Bug: webrtc:11581
Change-Id: I56ca5893c0153a87a4cbbe87d7741c39f9e66e52
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/177422
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Tommi <tommi@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31575}
This commit is contained in:
Tomas Gunnarsson
2020-06-27 17:44:55 +02:00
committed by Commit Bot
parent 000953c8d1
commit 473bbd8131
6 changed files with 73 additions and 49 deletions

View File

@ -34,7 +34,6 @@ namespace webrtc {
namespace {
const int64_t kRtpRtcpMaxIdleTimeProcessMs = 5;
const int64_t kRtpRtcpRttProcessTimeMs = 1000;
const int64_t kRtpRtcpBitrateProcessTimeMs = 10;
const int64_t kDefaultExpectedRetransmissionTimeMs = 125;
} // namespace
@ -49,10 +48,10 @@ ModuleRtpRtcpImpl2::RtpSenderContext::RtpSenderContext(
config.paced_sender ? config.paced_sender : &non_paced_sender) {}
ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
: rtcp_sender_(configuration),
: worker_queue_(TaskQueueBase::Current()),
rtcp_sender_(configuration),
rtcp_receiver_(configuration, this),
clock_(configuration.clock),
last_bitrate_process_time_(clock_->TimeInMilliseconds()),
last_rtt_process_time_(clock_->TimeInMilliseconds()),
next_process_time_(clock_->TimeInMilliseconds() +
kRtpRtcpMaxIdleTimeProcessMs),
@ -62,6 +61,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
remote_bitrate_(configuration.remote_bitrate_estimator),
rtt_stats_(configuration.rtt_stats),
rtt_ms_(0) {
RTC_DCHECK(worker_queue_);
process_thread_checker_.Detach();
if (!configuration.receiver_only) {
rtp_sender_ = std::make_unique<RtpSenderContext>(configuration);
@ -78,7 +78,7 @@ ModuleRtpRtcpImpl2::ModuleRtpRtcpImpl2(const Configuration& configuration)
}
ModuleRtpRtcpImpl2::~ModuleRtpRtcpImpl2() {
RTC_DCHECK_RUN_ON(&construction_thread_checker_);
RTC_DCHECK_RUN_ON(worker_queue_);
}
// static
@ -105,18 +105,6 @@ void ModuleRtpRtcpImpl2::Process() {
// times a second.
next_process_time_ = now + kRtpRtcpMaxIdleTimeProcessMs;
if (rtp_sender_) {
if (now >= last_bitrate_process_time_ + kRtpRtcpBitrateProcessTimeMs) {
rtp_sender_->packet_sender.ProcessBitrateAndNotifyObservers();
last_bitrate_process_time_ = now;
// TODO(bugs.webrtc.org/11581): Is this a bug? At the top of the function,
// next_process_time_ is incremented by 5ms, here we effectively do a
// std::min() of (now + 5ms, now + 10ms). Seems like this is a no-op?
next_process_time_ =
std::min(next_process_time_, now + kRtpRtcpBitrateProcessTimeMs);
}
}
// TODO(bugs.webrtc.org/11581): We update the RTT once a second, whereas other
// things that run in this method are updated much more frequently. Move the
// RTT checking over to the worker thread, which matches better with where the

View File

@ -21,6 +21,7 @@
#include "absl/types/optional.h"
#include "api/rtp_headers.h"
#include "api/task_queue/task_queue_base.h"
#include "api/video/video_bitrate_allocation.h"
#include "modules/include/module_fec_types.h"
#include "modules/remote_bitrate_estimator/include/remote_bitrate_estimator.h"
@ -283,7 +284,7 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
bool TimeToSendFullNackList(int64_t now) const;
SequenceChecker construction_thread_checker_;
TaskQueueBase* const worker_queue_;
SequenceChecker process_thread_checker_;
std::unique_ptr<RtpSenderContext> rtp_sender_;
@ -293,7 +294,6 @@ class ModuleRtpRtcpImpl2 final : public RtpRtcpInterface,
Clock* const clock_;
int64_t last_bitrate_process_time_;
int64_t last_rtt_process_time_;
int64_t next_process_time_;
uint16_t packet_overhead_;

View File

@ -26,6 +26,8 @@ constexpr uint32_t kTimestampTicksPerMs = 90;
constexpr int kSendSideDelayWindowMs = 1000;
constexpr int kBitrateStatisticsWindowMs = 1000;
constexpr size_t kRtpSequenceNumberMapMaxEntries = 1 << 13;
constexpr TimeDelta kUpdateInterval =
TimeDelta::Millis(kBitrateStatisticsWindowMs);
bool IsEnabled(absl::string_view name,
const WebRtcKeyValueConfig* field_trials) {
@ -55,7 +57,8 @@ void RtpSenderEgress::NonPacedPacketSender::EnqueuePackets(
RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
RtpPacketHistory* packet_history)
: ssrc_(config.local_media_ssrc),
: worker_queue_(TaskQueueBase::Current()),
ssrc_(config.local_media_ssrc),
rtx_ssrc_(config.rtx_send_ssrc),
flexfec_ssrc_(config.fec_generator ? config.fec_generator->FecSsrc()
: absl::nullopt),
@ -85,7 +88,19 @@ RtpSenderEgress::RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
? std::make_unique<RtpSequenceNumberMap>(
kRtpSequenceNumberMapMaxEntries)
: nullptr) {
RTC_DCHECK(TaskQueueBase::Current());
RTC_DCHECK(worker_queue_);
if (bitrate_callback_) {
update_task_ = RepeatingTaskHandle::DelayedStart(worker_queue_,
kUpdateInterval, [this]() {
PeriodicUpdate();
return kUpdateInterval;
});
}
}
RtpSenderEgress::~RtpSenderEgress() {
RTC_DCHECK_RUN_ON(worker_queue_);
update_task_.Stop();
}
void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
@ -198,29 +213,20 @@ void RtpSenderEgress::SendPacket(RtpPacketToSend* packet,
if (send_success) {
rtc::CritScope lock(&lock_);
UpdateRtpStats(*packet);
// TODO(bugs.webrtc.org/11581): Update the stats on the worker thread
// (PostTask).
UpdateRtpStats(now_ms, *packet);
media_has_been_sent_ = true;
}
}
void RtpSenderEgress::ProcessBitrateAndNotifyObservers() {
if (!bitrate_callback_)
return;
rtc::CritScope lock(&lock_);
RtpSendRates send_rates = GetSendRatesLocked();
bitrate_callback_->Notify(
send_rates.Sum().bps(),
send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
}
RtpSendRates RtpSenderEgress::GetSendRates() const {
rtc::CritScope lock(&lock_);
return GetSendRatesLocked();
const int64_t now_ms = clock_->TimeInMilliseconds();
return GetSendRatesLocked(now_ms);
}
RtpSendRates RtpSenderEgress::GetSendRatesLocked() const {
const int64_t now_ms = clock_->TimeInMilliseconds();
RtpSendRates RtpSenderEgress::GetSendRatesLocked(int64_t now_ms) const {
RtpSendRates current_rates;
for (size_t i = 0; i < kNumMediaTypes; ++i) {
RtpPacketMediaType type = static_cast<RtpPacketMediaType>(i);
@ -232,6 +238,8 @@ RtpSendRates RtpSenderEgress::GetSendRatesLocked() const {
void RtpSenderEgress::GetDataCounters(StreamDataCounters* rtp_stats,
StreamDataCounters* rtx_stats) const {
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
rtc::CritScope lock(&lock_);
*rtp_stats = rtp_stats_;
*rtx_stats = rtx_rtp_stats_;
@ -436,9 +444,10 @@ bool RtpSenderEgress::SendPacketToNetwork(const RtpPacketToSend& packet,
return true;
}
void RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) {
int64_t now_ms = clock_->TimeInMilliseconds();
void RtpSenderEgress::UpdateRtpStats(int64_t now_ms,
const RtpPacketToSend& packet) {
// TODO(bugs.webrtc.org/11581): make sure rtx_rtp_stats_ and rtp_stats_ are
// only touched on the worker thread.
StreamDataCounters* counters =
packet.Ssrc() == rtx_ssrc_ ? &rtx_rtp_stats_ : &rtp_stats_;
@ -456,12 +465,34 @@ void RtpSenderEgress::UpdateRtpStats(const RtpPacketToSend& packet) {
counters->transmitted.AddPacket(packet);
RTC_DCHECK(packet.packet_type().has_value());
// TODO(bugs.webrtc.org/11581): send_rates_ should be touched only on the
// worker thread.
send_rates_[static_cast<size_t>(*packet.packet_type())].Update(packet.size(),
now_ms);
// TODO(bugs.webrtc.org/11581): These (stats related) stat callbacks should be
// issued on the worker thread.
if (rtp_stats_callback_) {
rtp_stats_callback_->DataCountersUpdated(*counters, packet.Ssrc());
}
// The bitrate_callback_ and rtp_stats_callback_ pointers in practice point
// to the same object, so these callbacks could be consolidated into one.
if (bitrate_callback_) {
RtpSendRates send_rates = GetSendRatesLocked(now_ms);
bitrate_callback_->Notify(
send_rates.Sum().bps(),
send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
}
}
void RtpSenderEgress::PeriodicUpdate() {
RTC_DCHECK_RUN_ON(worker_queue_);
RTC_DCHECK(bitrate_callback_);
RtpSendRates send_rates = GetSendRates();
bitrate_callback_->Notify(
send_rates.Sum().bps(),
send_rates[RtpPacketMediaType::kRetransmission].bps(), ssrc_);
}
} // namespace webrtc

View File

@ -18,6 +18,7 @@
#include "absl/types/optional.h"
#include "api/call/transport.h"
#include "api/rtc_event_log/rtc_event_log.h"
#include "api/task_queue/task_queue_base.h"
#include "api/units/data_rate.h"
#include "modules/rtp_rtcp/include/rtp_rtcp_defines.h"
#include "modules/rtp_rtcp/source/rtp_packet_history.h"
@ -26,6 +27,8 @@
#include "modules/rtp_rtcp/source/rtp_sequence_number_map.h"
#include "rtc_base/critical_section.h"
#include "rtc_base/rate_statistics.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/repeating_task.h"
#include "rtc_base/thread_annotations.h"
namespace webrtc {
@ -49,7 +52,7 @@ class RtpSenderEgress {
RtpSenderEgress(const RtpRtcpInterface::Configuration& config,
RtpPacketHistory* packet_history);
~RtpSenderEgress() = default;
~RtpSenderEgress();
void SendPacket(RtpPacketToSend* packet, const PacedPacketInfo& pacing_info)
RTC_LOCKS_EXCLUDED(lock_);
@ -57,7 +60,6 @@ class RtpSenderEgress {
absl::optional<uint32_t> RtxSsrc() const { return rtx_ssrc_; }
absl::optional<uint32_t> FlexFecSsrc() const { return flexfec_ssrc_; }
void ProcessBitrateAndNotifyObservers() RTC_LOCKS_EXCLUDED(lock_);
RtpSendRates GetSendRates() const RTC_LOCKS_EXCLUDED(lock_);
void GetDataCounters(StreamDataCounters* rtp_stats,
StreamDataCounters* rtx_stats) const
@ -84,7 +86,8 @@ class RtpSenderEgress {
// time.
typedef std::map<int64_t, int> SendDelayMap;
RtpSendRates GetSendRatesLocked() const RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
RtpSendRates GetSendRatesLocked(int64_t now_ms) const
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
bool HasCorrectSsrc(const RtpPacketToSend& packet) const;
void AddPacketToTransportFeedback(uint16_t packet_id,
const RtpPacketToSend& packet,
@ -100,9 +103,13 @@ class RtpSenderEgress {
bool SendPacketToNetwork(const RtpPacketToSend& packet,
const PacketOptions& options,
const PacedPacketInfo& pacing_info);
void UpdateRtpStats(const RtpPacketToSend& packet)
void UpdateRtpStats(int64_t now_ms, const RtpPacketToSend& packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(lock_);
// Called on a timer, once a second, on the worker_queue_.
void PeriodicUpdate();
TaskQueueBase* const worker_queue_;
const uint32_t ssrc_;
const absl::optional<uint32_t> rtx_ssrc_;
const absl::optional<uint32_t> flexfec_ssrc_;
@ -142,6 +149,8 @@ class RtpSenderEgress {
// 3. Whether the packet was the last in its frame.
const std::unique_ptr<RtpSequenceNumberMap> rtp_sequence_number_map_
RTC_GUARDED_BY(lock_);
RepeatingTaskHandle update_task_ RTC_GUARDED_BY(worker_queue_);
};
} // namespace webrtc

View File

@ -1774,9 +1774,6 @@ TEST_P(RtpSenderTest, BitrateCallbacks) {
RtpPacketHistory::StorageMode::kStoreAndCull, 1);
uint32_t ssrc = rtp_sender()->SSRC();
// Initial process call so we get a new time window.
rtp_egress()->ProcessBitrateAndNotifyObservers();
// Send a few frames.
RTPVideoHeader video_header;
for (uint32_t i = 0; i < kNumPackets; ++i) {
@ -1787,15 +1784,13 @@ TEST_P(RtpSenderTest, BitrateCallbacks) {
fake_clock_.AdvanceTimeMilliseconds(kPacketInterval);
}
rtp_egress()->ProcessBitrateAndNotifyObservers();
// We get one call for every stats updated, thus two calls since both the
// stream stats and the retransmit stats are updated once.
EXPECT_EQ(2u, callback.num_calls_);
EXPECT_EQ(kNumPackets, callback.num_calls_);
EXPECT_EQ(ssrc, callback.ssrc_);
const uint32_t kTotalPacketSize = kPacketOverhead + sizeof(payload);
// Bitrate measured over delta between last and first timestamp, plus one.
const uint32_t kExpectedWindowMs = kNumPackets * kPacketInterval + 1;
const uint32_t kExpectedWindowMs = (kNumPackets - 1) * kPacketInterval + 1;
const uint32_t kExpectedBitsAccumulated = kTotalPacketSize * kNumPackets * 8;
const uint32_t kExpectedRateBps =
(kExpectedBitsAccumulated * 1000 + (kExpectedWindowMs / 2)) /