Migrate modules/pacing to webrtc::Mutex.

Bug: webrtc:11567
Change-Id: I4edded4bdad7c3d0be4c7cfa0d34219d942a467d
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176856
Reviewed-by: Erik Språng <sprang@webrtc.org>
Commit-Queue: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31494}
This commit is contained in:
Markus Handell
2020-06-10 15:12:15 +02:00
committed by Commit Bot
parent f7cba9f132
commit 1e79c9ba9c
5 changed files with 40 additions and 36 deletions

View File

@ -49,6 +49,7 @@ rtc_library("pacing") {
"../../rtc_base:rtc_base_approved", "../../rtc_base:rtc_base_approved",
"../../rtc_base:rtc_task_queue", "../../rtc_base:rtc_task_queue",
"../../rtc_base/experiments:field_trial_parser", "../../rtc_base/experiments:field_trial_parser",
"../../rtc_base/synchronization:mutex",
"../../rtc_base/synchronization:sequence_checker", "../../rtc_base/synchronization:sequence_checker",
"../../rtc_base/task_utils:to_queued_task", "../../rtc_base/task_utils:to_queued_task",
"../../system_wrappers", "../../system_wrappers",

View File

@ -55,7 +55,7 @@ PacketRouter::~PacketRouter() {
void PacketRouter::AddSendRtpModule(RtpRtcpInterface* rtp_module, void PacketRouter::AddSendRtpModule(RtpRtcpInterface* rtp_module,
bool remb_candidate) { bool remb_candidate) {
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
AddSendRtpModuleToMap(rtp_module, rtp_module->SSRC()); AddSendRtpModuleToMap(rtp_module, rtp_module->SSRC());
if (absl::optional<uint32_t> rtx_ssrc = rtp_module->RtxSsrc()) { if (absl::optional<uint32_t> rtx_ssrc = rtp_module->RtxSsrc()) {
@ -97,7 +97,7 @@ void PacketRouter::RemoveSendRtpModuleFromMap(uint32_t ssrc) {
} }
void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) { void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
MaybeRemoveRembModuleCandidate(rtp_module, /* media_sender = */ true); MaybeRemoveRembModuleCandidate(rtp_module, /* media_sender = */ true);
RemoveSendRtpModuleFromMap(rtp_module->SSRC()); RemoveSendRtpModuleFromMap(rtp_module->SSRC());
@ -115,7 +115,7 @@ void PacketRouter::RemoveSendRtpModule(RtpRtcpInterface* rtp_module) {
void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender, void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
bool remb_candidate) { bool remb_candidate) {
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
RTC_DCHECK(std::find(rtcp_feedback_senders_.begin(), RTC_DCHECK(std::find(rtcp_feedback_senders_.begin(),
rtcp_feedback_senders_.end(), rtcp_feedback_senders_.end(),
rtcp_sender) == rtcp_feedback_senders_.end()); rtcp_sender) == rtcp_feedback_senders_.end());
@ -129,7 +129,7 @@ void PacketRouter::AddReceiveRtpModule(RtcpFeedbackSenderInterface* rtcp_sender,
void PacketRouter::RemoveReceiveRtpModule( void PacketRouter::RemoveReceiveRtpModule(
RtcpFeedbackSenderInterface* rtcp_sender) { RtcpFeedbackSenderInterface* rtcp_sender) {
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
MaybeRemoveRembModuleCandidate(rtcp_sender, /* media_sender = */ false); MaybeRemoveRembModuleCandidate(rtcp_sender, /* media_sender = */ false);
auto it = std::find(rtcp_feedback_senders_.begin(), auto it = std::find(rtcp_feedback_senders_.begin(),
rtcp_feedback_senders_.end(), rtcp_sender); rtcp_feedback_senders_.end(), rtcp_sender);
@ -143,7 +143,7 @@ void PacketRouter::SendPacket(std::unique_ptr<RtpPacketToSend> packet,
"sequence_number", packet->SequenceNumber(), "rtp_timestamp", "sequence_number", packet->SequenceNumber(), "rtp_timestamp",
packet->Timestamp()); packet->Timestamp());
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
// With the new pacer code path, transport sequence numbers are only set here, // With the new pacer code path, transport sequence numbers are only set here,
// on the pacer thread. Therefore we don't need atomics/synchronization. // on the pacer thread. Therefore we don't need atomics/synchronization.
if (packet->HasExtension<TransportSequenceNumber>()) { if (packet->HasExtension<TransportSequenceNumber>()) {
@ -178,7 +178,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("webrtc"), TRACE_EVENT1(TRACE_DISABLED_BY_DEFAULT("webrtc"),
"PacketRouter::GeneratePadding", "bytes", size.bytes()); "PacketRouter::GeneratePadding", "bytes", size.bytes());
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
// First try on the last rtp module to have sent media. This increases the // First try on the last rtp module to have sent media. This increases the
// the chance that any payload based padding will be useful as it will be // the chance that any payload based padding will be useful as it will be
// somewhat distributed over modules according the packet rate, even if it // somewhat distributed over modules according the packet rate, even if it
@ -219,7 +219,7 @@ std::vector<std::unique_ptr<RtpPacketToSend>> PacketRouter::GeneratePadding(
} }
uint16_t PacketRouter::CurrentTransportSequenceNumber() const { uint16_t PacketRouter::CurrentTransportSequenceNumber() const {
rtc::CritScope lock(&modules_crit_); MutexLock lock(&modules_mutex_);
return transport_seq_ & 0xFFFF; return transport_seq_ & 0xFFFF;
} }
@ -233,7 +233,7 @@ void PacketRouter::OnReceiveBitrateChanged(const std::vector<uint32_t>& ssrcs,
int64_t now_ms = rtc::TimeMillis(); int64_t now_ms = rtc::TimeMillis();
{ {
rtc::CritScope lock(&remb_crit_); MutexLock lock(&remb_mutex_);
// If we already have an estimate, check if the new total estimate is below // If we already have an estimate, check if the new total estimate is below
// kSendThresholdPercent of the previous estimate. // kSendThresholdPercent of the previous estimate.
@ -266,7 +266,7 @@ void PacketRouter::OnReceiveBitrateChanged(const std::vector<uint32_t>& ssrcs,
void PacketRouter::SetMaxDesiredReceiveBitrate(int64_t bitrate_bps) { void PacketRouter::SetMaxDesiredReceiveBitrate(int64_t bitrate_bps) {
RTC_DCHECK_GE(bitrate_bps, 0); RTC_DCHECK_GE(bitrate_bps, 0);
{ {
rtc::CritScope lock(&remb_crit_); MutexLock lock(&remb_mutex_);
max_bitrate_bps_ = bitrate_bps; max_bitrate_bps_ = bitrate_bps;
if (rtc::TimeMillis() - last_remb_time_ms_ < kRembSendIntervalMs && if (rtc::TimeMillis() - last_remb_time_ms_ < kRembSendIntervalMs &&
last_send_bitrate_bps_ > 0 && last_send_bitrate_bps_ > 0 &&
@ -280,7 +280,7 @@ void PacketRouter::SetMaxDesiredReceiveBitrate(int64_t bitrate_bps) {
bool PacketRouter::SendRemb(int64_t bitrate_bps, bool PacketRouter::SendRemb(int64_t bitrate_bps,
const std::vector<uint32_t>& ssrcs) { const std::vector<uint32_t>& ssrcs) {
rtc::CritScope lock(&modules_crit_); MutexLock lock(&modules_mutex_);
if (!active_remb_module_) { if (!active_remb_module_) {
return false; return false;
@ -295,7 +295,7 @@ bool PacketRouter::SendRemb(int64_t bitrate_bps,
bool PacketRouter::SendCombinedRtcpPacket( bool PacketRouter::SendCombinedRtcpPacket(
std::vector<std::unique_ptr<rtcp::RtcpPacket>> packets) { std::vector<std::unique_ptr<rtcp::RtcpPacket>> packets) {
rtc::CritScope cs(&modules_crit_); MutexLock lock(&modules_mutex_);
// Prefer send modules. // Prefer send modules.
for (RtpRtcpInterface* rtp_module : send_modules_list_) { for (RtpRtcpInterface* rtp_module : send_modules_list_) {

View File

@ -28,6 +28,7 @@
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
namespace webrtc { namespace webrtc {
@ -83,48 +84,49 @@ class PacketRouter : public RemoteBitrateObserver,
private: private:
void AddRembModuleCandidate(RtcpFeedbackSenderInterface* candidate_module, void AddRembModuleCandidate(RtcpFeedbackSenderInterface* candidate_module,
bool media_sender) bool media_sender)
RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
void MaybeRemoveRembModuleCandidate( void MaybeRemoveRembModuleCandidate(
RtcpFeedbackSenderInterface* candidate_module, RtcpFeedbackSenderInterface* candidate_module,
bool media_sender) RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); bool media_sender) RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
void UnsetActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); void UnsetActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
void DetermineActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); void DetermineActiveRembModule() RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
void AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module, uint32_t ssrc) void AddSendRtpModuleToMap(RtpRtcpInterface* rtp_module, uint32_t ssrc)
RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
void RemoveSendRtpModuleFromMap(uint32_t ssrc) void RemoveSendRtpModuleFromMap(uint32_t ssrc)
RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_crit_); RTC_EXCLUSIVE_LOCKS_REQUIRED(modules_mutex_);
rtc::CriticalSection modules_crit_; mutable Mutex modules_mutex_;
// Ssrc to RtpRtcpInterface module; // Ssrc to RtpRtcpInterface module;
std::unordered_map<uint32_t, RtpRtcpInterface*> send_modules_map_ std::unordered_map<uint32_t, RtpRtcpInterface*> send_modules_map_
RTC_GUARDED_BY(modules_crit_); RTC_GUARDED_BY(modules_mutex_);
std::list<RtpRtcpInterface*> send_modules_list_ RTC_GUARDED_BY(modules_crit_); std::list<RtpRtcpInterface*> send_modules_list_
RTC_GUARDED_BY(modules_mutex_);
// The last module used to send media. // The last module used to send media.
RtpRtcpInterface* last_send_module_ RTC_GUARDED_BY(modules_crit_); RtpRtcpInterface* last_send_module_ RTC_GUARDED_BY(modules_mutex_);
// Rtcp modules of the rtp receivers. // Rtcp modules of the rtp receivers.
std::vector<RtcpFeedbackSenderInterface*> rtcp_feedback_senders_ std::vector<RtcpFeedbackSenderInterface*> rtcp_feedback_senders_
RTC_GUARDED_BY(modules_crit_); RTC_GUARDED_BY(modules_mutex_);
// TODO(eladalon): remb_crit_ only ever held from one function, and it's not // TODO(eladalon): remb_mutex_ only ever held from one function, and it's not
// clear if that function can actually be called from more than one thread. // clear if that function can actually be called from more than one thread.
rtc::CriticalSection remb_crit_; Mutex remb_mutex_;
// The last time a REMB was sent. // The last time a REMB was sent.
int64_t last_remb_time_ms_ RTC_GUARDED_BY(remb_crit_); int64_t last_remb_time_ms_ RTC_GUARDED_BY(remb_mutex_);
int64_t last_send_bitrate_bps_ RTC_GUARDED_BY(remb_crit_); int64_t last_send_bitrate_bps_ RTC_GUARDED_BY(remb_mutex_);
// The last bitrate update. // The last bitrate update.
int64_t bitrate_bps_ RTC_GUARDED_BY(remb_crit_); int64_t bitrate_bps_ RTC_GUARDED_BY(remb_mutex_);
int64_t max_bitrate_bps_ RTC_GUARDED_BY(remb_crit_); int64_t max_bitrate_bps_ RTC_GUARDED_BY(remb_mutex_);
// Candidates for the REMB module can be RTP sender/receiver modules, with // Candidates for the REMB module can be RTP sender/receiver modules, with
// the sender modules taking precedence. // the sender modules taking precedence.
std::vector<RtcpFeedbackSenderInterface*> sender_remb_candidates_ std::vector<RtcpFeedbackSenderInterface*> sender_remb_candidates_
RTC_GUARDED_BY(modules_crit_); RTC_GUARDED_BY(modules_mutex_);
std::vector<RtcpFeedbackSenderInterface*> receiver_remb_candidates_ std::vector<RtcpFeedbackSenderInterface*> receiver_remb_candidates_
RTC_GUARDED_BY(modules_crit_); RTC_GUARDED_BY(modules_mutex_);
RtcpFeedbackSenderInterface* active_remb_module_ RtcpFeedbackSenderInterface* active_remb_module_
RTC_GUARDED_BY(modules_crit_); RTC_GUARDED_BY(modules_mutex_);
uint64_t transport_seq_ RTC_GUARDED_BY(modules_crit_); uint64_t transport_seq_ RTC_GUARDED_BY(modules_mutex_);
RTC_DISALLOW_COPY_AND_ASSIGN(PacketRouter); RTC_DISALLOW_COPY_AND_ASSIGN(PacketRouter);
}; };

View File

@ -188,7 +188,7 @@ TimeDelta TaskQueuePacedSender::OldestPacketWaitTime() const {
} }
void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) { void TaskQueuePacedSender::OnStatsUpdated(const Stats& stats) {
rtc::CritScope cs(&stats_crit_); MutexLock lock(&stats_mutex_);
current_stats_ = stats; current_stats_ = stats;
} }
@ -299,7 +299,7 @@ void TaskQueuePacedSender::MaybeUpdateStats(bool is_scheduled_call) {
} }
TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const { TaskQueuePacedSender::Stats TaskQueuePacedSender::GetStats() const {
rtc::CritScope cs(&stats_crit_); MutexLock lock(&stats_mutex_);
return current_stats_; return current_stats_;
} }

View File

@ -30,6 +30,7 @@
#include "modules/pacing/rtp_packet_pacer.h" #include "modules/pacing/rtp_packet_pacer.h"
#include "modules/rtp_rtcp/source/rtp_packet_to_send.h" #include "modules/rtp_rtcp/source/rtp_packet_to_send.h"
#include "rtc_base/critical_section.h" #include "rtc_base/critical_section.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h" #include "rtc_base/task_queue.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
@ -155,8 +156,8 @@ class TaskQueuePacedSender : public RtpPacketPacer, public RtpPacketSender {
// never drain. // never drain.
bool is_shutdown_ RTC_GUARDED_BY(task_queue_); bool is_shutdown_ RTC_GUARDED_BY(task_queue_);
rtc::CriticalSection stats_crit_; mutable Mutex stats_mutex_;
Stats current_stats_ RTC_GUARDED_BY(stats_crit_); Stats current_stats_ RTC_GUARDED_BY(stats_mutex_);
rtc::TaskQueue task_queue_; rtc::TaskQueue task_queue_;
}; };