Add a mutex free implementation of webrtc::ReceiveStatistics

The mutex is removed from the old existing implementation and instead a wrapper is implemented that ensure thread-safety.
Both the thread-safe and unsafe version share the same implementation of the logic.

There are two ways of construction:
webrtc::ReceiveStatistics::Create - thread-safe version.
webrtc::ReceiveStatistics::CreateUnLocked -thread-unsafe

Bug: none
Change-Id: Ica375919fda70180335c8f9ea666497811daf866
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/211240
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Commit-Queue: Per Kjellander <perkj@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33419}
This commit is contained in:
Per Kjellander
2021-03-10 12:31:38 +01:00
committed by Commit Bot
parent bb22066e60
commit ee8cd20ec5
4 changed files with 225 additions and 115 deletions

View File

@ -13,6 +13,7 @@
#include <cmath>
#include <cstdlib>
#include <memory>
#include <utility>
#include <vector>
#include "modules/remote_bitrate_estimator/test/bwe_test_logging.h"
@ -100,7 +101,6 @@ bool StreamStatisticianImpl::UpdateOutOfOrder(const RtpPacketReceived& packet,
}
void StreamStatisticianImpl::UpdateCounters(const RtpPacketReceived& packet) {
MutexLock lock(&stream_lock_);
RTC_DCHECK_EQ(ssrc_, packet.Ssrc());
int64_t now_ms = clock_->TimeInMilliseconds();
@ -159,17 +159,14 @@ void StreamStatisticianImpl::UpdateJitter(const RtpPacketReceived& packet,
void StreamStatisticianImpl::SetMaxReorderingThreshold(
int max_reordering_threshold) {
MutexLock lock(&stream_lock_);
max_reordering_threshold_ = max_reordering_threshold;
}
void StreamStatisticianImpl::EnableRetransmitDetection(bool enable) {
MutexLock lock(&stream_lock_);
enable_retransmit_detection_ = enable;
}
RtpReceiveStats StreamStatisticianImpl::GetStats() const {
MutexLock lock(&stream_lock_);
RtpReceiveStats stats;
stats.packets_lost = cumulative_loss_;
// TODO(nisse): Can we return a float instead?
@ -183,7 +180,6 @@ RtpReceiveStats StreamStatisticianImpl::GetStats() const {
bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
RtcpStatistics* statistics) {
MutexLock lock(&stream_lock_);
if (clock_->TimeInMilliseconds() - last_receive_time_ms_ >=
kStatisticsTimeoutMs) {
// Not active.
@ -192,9 +188,7 @@ bool StreamStatisticianImpl::GetActiveStatisticsAndReset(
if (!ReceivedRtpPacket()) {
return false;
}
*statistics = CalculateRtcpStatistics();
return true;
}
@ -241,7 +235,6 @@ RtcpStatistics StreamStatisticianImpl::CalculateRtcpStatistics() {
}
absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
MutexLock lock(&stream_lock_);
if (!ReceivedRtpPacket()) {
return absl::nullopt;
}
@ -257,12 +250,10 @@ absl::optional<int> StreamStatisticianImpl::GetFractionLostInPercent() const {
StreamDataCounters StreamStatisticianImpl::GetReceiveStreamDataCounters()
const {
MutexLock lock(&stream_lock_);
return receive_counters_;
}
uint32_t StreamStatisticianImpl::BitrateReceived() const {
MutexLock lock(&stream_lock_);
return incoming_bitrate_.Rate(clock_->TimeInMilliseconds()).value_or(0);
}
@ -295,21 +286,33 @@ bool StreamStatisticianImpl::IsRetransmitOfOldPacket(
}
std::unique_ptr<ReceiveStatistics> ReceiveStatistics::Create(Clock* clock) {
return std::make_unique<ReceiveStatisticsImpl>(clock);
return std::make_unique<ReceiveStatisticsLocked>(
clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
return std::make_unique<StreamStatisticianLocked>(
ssrc, clock, max_reordering_threshold);
});
}
ReceiveStatisticsImpl::ReceiveStatisticsImpl(Clock* clock)
std::unique_ptr<ReceiveStatistics> ReceiveStatistics::CreateThreadCompatible(
Clock* clock) {
return std::make_unique<ReceiveStatisticsImpl>(
clock, [](uint32_t ssrc, Clock* clock, int max_reordering_threshold) {
return std::make_unique<StreamStatisticianImpl>(
ssrc, clock, max_reordering_threshold);
});
}
ReceiveStatisticsImpl::ReceiveStatisticsImpl(
Clock* clock,
std::function<std::unique_ptr<StreamStatisticianImplInterface>(
uint32_t ssrc,
Clock* clock,
int max_reordering_threshold)> stream_statistician_factory)
: clock_(clock),
stream_statistician_factory_(std::move(stream_statistician_factory)),
last_returned_ssrc_(0),
max_reordering_threshold_(kDefaultMaxReorderingThreshold) {}
ReceiveStatisticsImpl::~ReceiveStatisticsImpl() {
while (!statisticians_.empty()) {
delete statisticians_.begin()->second;
statisticians_.erase(statisticians_.begin());
}
}
void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
// StreamStatisticianImpl instance is created once and only destroyed when
// this whole ReceiveStatisticsImpl is destroyed. StreamStatisticianImpl has
@ -318,34 +321,28 @@ void ReceiveStatisticsImpl::OnRtpPacket(const RtpPacketReceived& packet) {
GetOrCreateStatistician(packet.Ssrc())->UpdateCounters(packet);
}
StreamStatisticianImpl* ReceiveStatisticsImpl::GetStatistician(
StreamStatistician* ReceiveStatisticsImpl::GetStatistician(
uint32_t ssrc) const {
MutexLock lock(&receive_statistics_lock_);
const auto& it = statisticians_.find(ssrc);
if (it == statisticians_.end())
return NULL;
return it->second;
return nullptr;
return it->second.get();
}
StreamStatisticianImpl* ReceiveStatisticsImpl::GetOrCreateStatistician(
StreamStatisticianImplInterface* ReceiveStatisticsImpl::GetOrCreateStatistician(
uint32_t ssrc) {
MutexLock lock(&receive_statistics_lock_);
StreamStatisticianImpl*& impl = statisticians_[ssrc];
std::unique_ptr<StreamStatisticianImplInterface>& impl = statisticians_[ssrc];
if (impl == nullptr) { // new element
impl = new StreamStatisticianImpl(ssrc, clock_, max_reordering_threshold_);
impl =
stream_statistician_factory_(ssrc, clock_, max_reordering_threshold_);
}
return impl;
return impl.get();
}
void ReceiveStatisticsImpl::SetMaxReorderingThreshold(
int max_reordering_threshold) {
std::map<uint32_t, StreamStatisticianImpl*> statisticians;
{
MutexLock lock(&receive_statistics_lock_);
max_reordering_threshold_ = max_reordering_threshold;
statisticians = statisticians_;
}
for (auto& statistician : statisticians) {
max_reordering_threshold_ = max_reordering_threshold;
for (auto& statistician : statisticians_) {
statistician.second->SetMaxReorderingThreshold(max_reordering_threshold);
}
}
@ -364,15 +361,11 @@ void ReceiveStatisticsImpl::EnableRetransmitDetection(uint32_t ssrc,
std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
size_t max_blocks) {
std::map<uint32_t, StreamStatisticianImpl*> statisticians;
{
MutexLock lock(&receive_statistics_lock_);
statisticians = statisticians_;
}
std::vector<rtcp::ReportBlock> result;
result.reserve(std::min(max_blocks, statisticians.size()));
auto add_report_block = [&result](uint32_t media_ssrc,
StreamStatisticianImpl* statistician) {
result.reserve(std::min(max_blocks, statisticians_.size()));
auto add_report_block = [&result](
uint32_t media_ssrc,
StreamStatisticianImplInterface* statistician) {
// Do we have receive statistics to send?
RtcpStatistics stats;
if (!statistician->GetActiveStatisticsAndReset(&stats))
@ -390,13 +383,13 @@ std::vector<rtcp::ReportBlock> ReceiveStatisticsImpl::RtcpReportBlocks(
block.SetJitter(stats.jitter);
};
const auto start_it = statisticians.upper_bound(last_returned_ssrc_);
const auto start_it = statisticians_.upper_bound(last_returned_ssrc_);
for (auto it = start_it;
result.size() < max_blocks && it != statisticians.end(); ++it)
add_report_block(it->first, it->second);
for (auto it = statisticians.begin();
result.size() < max_blocks && it != statisticians_.end(); ++it)
add_report_block(it->first, it->second.get());
for (auto it = statisticians_.begin();
result.size() < max_blocks && it != start_it; ++it)
add_report_block(it->first, it->second);
add_report_block(it->first, it->second.get());
if (!result.empty())
last_returned_ssrc_ = result.back().source_ssrc();