Remove send_crit_, bitrate_crit_ and last_bandwidth_bps_crit_ locks.

...from the Call class.

With minimal shifting of ownership of a few variables, we can
maintain their state fully on the worker thread, where they're used.

This change also removes the dependency on RWLockWrapper.


Bug: webrtc:11612
Change-Id: Ia14be5bd6b50bd0b32d04f078b1e283080c00a19
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/176122
Commit-Queue: Tommi <tommi@webrtc.org>
Reviewed-by: Markus Handell <handellm@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#31360}
This commit is contained in:
Tommi
2020-05-26 19:35:16 +02:00
committed by Commit Bot
parent ff84d86d9c
commit 0d4647dffe
2 changed files with 110 additions and 151 deletions

View File

@ -280,8 +280,8 @@ rtc_library("call") {
"../rtc_base:safe_minmax", "../rtc_base:safe_minmax",
"../rtc_base/experiments:field_trial_parser", "../rtc_base/experiments:field_trial_parser",
"../rtc_base/network:sent_packet", "../rtc_base/network:sent_packet",
"../rtc_base/synchronization:rw_lock_wrapper",
"../rtc_base/synchronization:sequence_checker", "../rtc_base/synchronization:sequence_checker",
"../rtc_base/task_utils:pending_task_safety_flag",
"../system_wrappers", "../system_wrappers",
"../system_wrappers:field_trial", "../system_wrappers:field_trial",
"../system_wrappers:metrics", "../system_wrappers:metrics",

View File

@ -49,8 +49,8 @@
#include "rtc_base/location.h" #include "rtc_base/location.h"
#include "rtc_base/logging.h" #include "rtc_base/logging.h"
#include "rtc_base/strings/string_builder.h" #include "rtc_base/strings/string_builder.h"
#include "rtc_base/synchronization/rw_lock_wrapper.h"
#include "rtc_base/synchronization/sequence_checker.h" #include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_utils/pending_task_safety_flag.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
#include "rtc_base/time_utils.h" #include "rtc_base/time_utils.h"
#include "rtc_base/trace_event.h" #include "rtc_base/trace_event.h"
@ -244,20 +244,20 @@ class Call final : public webrtc::Call,
DeliveryStatus DeliverRtcp(MediaType media_type, DeliveryStatus DeliverRtcp(MediaType media_type,
const uint8_t* packet, const uint8_t* packet,
size_t length) size_t length)
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_); RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
DeliveryStatus DeliverRtp(MediaType media_type, DeliveryStatus DeliverRtp(MediaType media_type,
rtc::CopyOnWriteBuffer packet, rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) int64_t packet_time_us)
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_); RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void ConfigureSync(const std::string& sync_group) void ConfigureSync(const std::string& sync_group)
RTC_EXCLUSIVE_LOCKS_REQUIRED(configuration_sequence_checker_); RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet, void NotifyBweOfReceivedPacket(const RtpPacketReceived& packet,
MediaType media_type) MediaType media_type)
RTC_SHARED_LOCKS_REQUIRED(configuration_sequence_checker_); RTC_SHARED_LOCKS_REQUIRED(worker_thread_);
void UpdateSendHistograms(Timestamp first_sent_packet) void UpdateSendHistograms(Timestamp first_sent_packet)
RTC_EXCLUSIVE_LOCKS_REQUIRED(&bitrate_crit_); RTC_EXCLUSIVE_LOCKS_REQUIRED(worker_thread_);
void UpdateReceiveHistograms(); void UpdateReceiveHistograms();
void UpdateHistograms(); void UpdateHistograms();
void UpdateAggregateNetworkState(); void UpdateAggregateNetworkState();
@ -270,28 +270,28 @@ class Call final : public webrtc::Call,
Clock* const clock_; Clock* const clock_;
TaskQueueFactory* const task_queue_factory_; TaskQueueFactory* const task_queue_factory_;
TaskQueueBase* const worker_thread_;
const int num_cpu_cores_; const int num_cpu_cores_;
const rtc::scoped_refptr<SharedModuleThread> module_process_thread_; const rtc::scoped_refptr<SharedModuleThread> module_process_thread_;
const std::unique_ptr<CallStats> call_stats_; const std::unique_ptr<CallStats> call_stats_;
const std::unique_ptr<BitrateAllocator> bitrate_allocator_; const std::unique_ptr<BitrateAllocator> bitrate_allocator_;
Call::Config config_; Call::Config config_;
SequenceChecker configuration_sequence_checker_;
SequenceChecker network_sequence_checker_; SequenceChecker network_sequence_checker_;
NetworkState audio_network_state_; NetworkState audio_network_state_;
NetworkState video_network_state_; NetworkState video_network_state_;
bool aggregate_network_up_ RTC_GUARDED_BY(configuration_sequence_checker_); bool aggregate_network_up_ RTC_GUARDED_BY(worker_thread_);
// Audio, Video, and FlexFEC receive streams are owned by the client that // Audio, Video, and FlexFEC receive streams are owned by the client that
// creates them. // creates them.
std::set<AudioReceiveStream*> audio_receive_streams_ std::set<AudioReceiveStream*> audio_receive_streams_
RTC_GUARDED_BY(configuration_sequence_checker_); RTC_GUARDED_BY(worker_thread_);
std::set<VideoReceiveStream2*> video_receive_streams_ std::set<VideoReceiveStream2*> video_receive_streams_
RTC_GUARDED_BY(configuration_sequence_checker_); RTC_GUARDED_BY(worker_thread_);
std::map<std::string, AudioReceiveStream*> sync_stream_mapping_ std::map<std::string, AudioReceiveStream*> sync_stream_mapping_
RTC_GUARDED_BY(configuration_sequence_checker_); RTC_GUARDED_BY(worker_thread_);
// TODO(nisse): Should eventually be injected at creation, // TODO(nisse): Should eventually be injected at creation,
// with a single object in the bundled case. // with a single object in the bundled case.
@ -325,25 +325,22 @@ class Call final : public webrtc::Call,
const bool use_send_side_bwe; const bool use_send_side_bwe;
}; };
std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_ std::map<uint32_t, ReceiveRtpConfig> receive_rtp_config_
RTC_GUARDED_BY(configuration_sequence_checker_); RTC_GUARDED_BY(worker_thread_);
std::unique_ptr<RWLockWrapper> send_crit_;
// Audio and Video send streams are owned by the client that creates them. // Audio and Video send streams are owned by the client that creates them.
std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_ std::map<uint32_t, AudioSendStream*> audio_send_ssrcs_
RTC_GUARDED_BY(send_crit_); RTC_GUARDED_BY(worker_thread_);
std::map<uint32_t, VideoSendStream*> video_send_ssrcs_ std::map<uint32_t, VideoSendStream*> video_send_ssrcs_
RTC_GUARDED_BY(send_crit_); RTC_GUARDED_BY(worker_thread_);
std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(send_crit_); std::set<VideoSendStream*> video_send_streams_ RTC_GUARDED_BY(worker_thread_);
using RtpStateMap = std::map<uint32_t, RtpState>; using RtpStateMap = std::map<uint32_t, RtpState>;
RtpStateMap suspended_audio_send_ssrcs_ RtpStateMap suspended_audio_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
RTC_GUARDED_BY(configuration_sequence_checker_); RtpStateMap suspended_video_send_ssrcs_ RTC_GUARDED_BY(worker_thread_);
RtpStateMap suspended_video_send_ssrcs_
RTC_GUARDED_BY(configuration_sequence_checker_);
using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>; using RtpPayloadStateMap = std::map<uint32_t, RtpPayloadState>;
RtpPayloadStateMap suspended_video_payload_states_ RtpPayloadStateMap suspended_video_payload_states_
RTC_GUARDED_BY(configuration_sequence_checker_); RTC_GUARDED_BY(worker_thread_);
webrtc::RtcEventLog* event_log_; webrtc::RtcEventLog* event_log_;
@ -359,17 +356,14 @@ class Call final : public webrtc::Call,
absl::optional<int64_t> first_received_rtp_video_ms_; absl::optional<int64_t> first_received_rtp_video_ms_;
absl::optional<int64_t> last_received_rtp_video_ms_; absl::optional<int64_t> last_received_rtp_video_ms_;
rtc::CriticalSection last_bandwidth_bps_crit_; uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(worker_thread_);
uint32_t last_bandwidth_bps_ RTC_GUARDED_BY(&last_bandwidth_bps_crit_);
// TODO(holmer): Remove this lock once BitrateController no longer calls // TODO(holmer): Remove this lock once BitrateController no longer calls
// OnNetworkChanged from multiple threads. // OnNetworkChanged from multiple threads.
rtc::CriticalSection bitrate_crit_; uint32_t min_allocated_send_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
uint32_t min_allocated_send_bitrate_bps_ uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(worker_thread_);
RTC_GUARDED_BY(&network_sequence_checker_);
uint32_t configured_max_padding_bitrate_bps_ RTC_GUARDED_BY(&bitrate_crit_);
AvgCounter estimated_send_bitrate_kbps_counter_ AvgCounter estimated_send_bitrate_kbps_counter_
RTC_GUARDED_BY(&bitrate_crit_); RTC_GUARDED_BY(worker_thread_);
AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(&bitrate_crit_); AvgCounter pacer_bitrate_kbps_counter_ RTC_GUARDED_BY(worker_thread_);
ReceiveSideCongestionController receive_side_cc_; ReceiveSideCongestionController receive_side_cc_;
@ -378,6 +372,11 @@ class Call final : public webrtc::Call,
const std::unique_ptr<SendDelayStats> video_send_delay_stats_; const std::unique_ptr<SendDelayStats> video_send_delay_stats_;
const int64_t start_ms_; const int64_t start_ms_;
// Note that |task_safety_| needs to be at a greater scope than the task queue
// owned by |transport_send_| since calls might arrive on the network thread
// while Call is being deleted and the task queue is being torn down.
ScopedTaskSafety task_safety_;
// Caches transport_send_.get(), to avoid racing with destructor. // Caches transport_send_.get(), to avoid racing with destructor.
// Note that this is declared before transport_send_ to ensure that it is not // Note that this is declared before transport_send_ to ensure that it is not
// invalidated until no more tasks can be running on the transport_send_ task // invalidated until no more tasks can be running on the transport_send_ task
@ -387,8 +386,8 @@ class Call final : public webrtc::Call,
// last ensures that it is destroyed first and any running tasks are finished. // last ensures that it is destroyed first and any running tasks are finished.
std::unique_ptr<RtpTransportControllerSendInterface> transport_send_; std::unique_ptr<RtpTransportControllerSendInterface> transport_send_;
bool is_target_rate_observer_registered_ bool is_target_rate_observer_registered_ RTC_GUARDED_BY(worker_thread_) =
RTC_GUARDED_BY(&configuration_sequence_checker_) = false; false;
RTC_DISALLOW_COPY_AND_ASSIGN(Call); RTC_DISALLOW_COPY_AND_ASSIGN(Call);
}; };
@ -550,15 +549,15 @@ Call::Call(Clock* clock,
TaskQueueFactory* task_queue_factory) TaskQueueFactory* task_queue_factory)
: clock_(clock), : clock_(clock),
task_queue_factory_(task_queue_factory), task_queue_factory_(task_queue_factory),
worker_thread_(GetCurrentTaskQueueOrThread()),
num_cpu_cores_(CpuInfo::DetectNumberOfCores()), num_cpu_cores_(CpuInfo::DetectNumberOfCores()),
module_process_thread_(std::move(module_process_thread)), module_process_thread_(std::move(module_process_thread)),
call_stats_(new CallStats(clock_, GetCurrentTaskQueueOrThread())), call_stats_(new CallStats(clock_, worker_thread_)),
bitrate_allocator_(new BitrateAllocator(this)), bitrate_allocator_(new BitrateAllocator(this)),
config_(config), config_(config),
audio_network_state_(kNetworkDown), audio_network_state_(kNetworkDown),
video_network_state_(kNetworkDown), video_network_state_(kNetworkDown),
aggregate_network_up_(false), aggregate_network_up_(false),
send_crit_(RWLockWrapper::CreateRWLock()),
event_log_(config.event_log), event_log_(config.event_log),
received_bytes_per_second_counter_(clock_, nullptr, true), received_bytes_per_second_counter_(clock_, nullptr, true),
received_audio_bytes_per_second_counter_(clock_, nullptr, true), received_audio_bytes_per_second_counter_(clock_, nullptr, true),
@ -577,6 +576,7 @@ Call::Call(Clock* clock,
transport_send_(std::move(transport_send)) { transport_send_(std::move(transport_send)) {
RTC_DCHECK(config.event_log != nullptr); RTC_DCHECK(config.event_log != nullptr);
RTC_DCHECK(config.trials != nullptr); RTC_DCHECK(config.trials != nullptr);
RTC_DCHECK(worker_thread_->IsCurrent());
network_sequence_checker_.Detach(); network_sequence_checker_.Detach();
call_stats_->RegisterStatsObserver(&receive_side_cc_); call_stats_->RegisterStatsObserver(&receive_side_cc_);
@ -588,7 +588,7 @@ Call::Call(Clock* clock,
} }
Call::~Call() { Call::~Call() {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_CHECK(audio_send_ssrcs_.empty()); RTC_CHECK(audio_send_ssrcs_.empty());
RTC_CHECK(video_send_ssrcs_.empty()); RTC_CHECK(video_send_ssrcs_.empty());
@ -607,7 +607,6 @@ Call::~Call() {
// Only update histograms after process threads have been shut down, so that // Only update histograms after process threads have been shut down, so that
// they won't try to concurrently update stats. // they won't try to concurrently update stats.
if (first_sent_packet_ms) { if (first_sent_packet_ms) {
rtc::CritScope lock(&bitrate_crit_);
UpdateSendHistograms(*first_sent_packet_ms); UpdateSendHistograms(*first_sent_packet_ms);
} }
@ -616,7 +615,7 @@ Call::~Call() {
} }
void Call::RegisterRateObserver() { void Call::RegisterRateObserver() {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
if (is_target_rate_observer_registered_) if (is_target_rate_observer_registered_)
return; return;
@ -631,7 +630,7 @@ void Call::RegisterRateObserver() {
} }
void Call::SetClientBitratePreferences(const BitrateSettings& preferences) { void Call::SetClientBitratePreferences(const BitrateSettings& preferences) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
GetTransportControllerSend()->SetClientBitratePreferences(preferences); GetTransportControllerSend()->SetClientBitratePreferences(preferences);
} }
@ -713,14 +712,14 @@ void Call::UpdateReceiveHistograms() {
} }
PacketReceiver* Call::Receiver() { PacketReceiver* Call::Receiver() {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
return this; return this;
} }
webrtc::AudioSendStream* Call::CreateAudioSendStream( webrtc::AudioSendStream* Call::CreateAudioSendStream(
const webrtc::AudioSendStream::Config& config) { const webrtc::AudioSendStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream"); TRACE_EVENT0("webrtc", "Call::CreateAudioSendStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver(); RegisterRateObserver();
@ -739,12 +738,9 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
module_process_thread_->process_thread(), transport_send_ptr_, module_process_thread_->process_thread(), transport_send_ptr_,
bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(), bitrate_allocator_.get(), event_log_, call_stats_->AsRtcpRttStats(),
suspended_rtp_state); suspended_rtp_state);
{
WriteLockScoped write_lock(*send_crit_);
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) == RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
audio_send_ssrcs_.end()); audio_send_ssrcs_.end());
audio_send_ssrcs_[config.rtp.ssrc] = send_stream; audio_send_ssrcs_[config.rtp.ssrc] = send_stream;
}
for (AudioReceiveStream* stream : audio_receive_streams_) { for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == config.rtp.ssrc) { if (stream->config().rtp.local_ssrc == config.rtp.ssrc) {
@ -758,7 +754,7 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) { void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream"); TRACE_EVENT0("webrtc", "Call::DestroyAudioSendStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(send_stream != nullptr); RTC_DCHECK(send_stream != nullptr);
send_stream->Stop(); send_stream->Stop();
@ -767,11 +763,9 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
webrtc::internal::AudioSendStream* audio_send_stream = webrtc::internal::AudioSendStream* audio_send_stream =
static_cast<webrtc::internal::AudioSendStream*>(send_stream); static_cast<webrtc::internal::AudioSendStream*>(send_stream);
suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState(); suspended_audio_send_ssrcs_[ssrc] = audio_send_stream->GetRtpState();
{
WriteLockScoped write_lock(*send_crit_);
size_t num_deleted = audio_send_ssrcs_.erase(ssrc); size_t num_deleted = audio_send_ssrcs_.erase(ssrc);
RTC_DCHECK_EQ(1, num_deleted); RTC_DCHECK_EQ(1, num_deleted);
}
for (AudioReceiveStream* stream : audio_receive_streams_) { for (AudioReceiveStream* stream : audio_receive_streams_) {
if (stream->config().rtp.local_ssrc == ssrc) { if (stream->config().rtp.local_ssrc == ssrc) {
@ -786,7 +780,7 @@ void Call::DestroyAudioSendStream(webrtc::AudioSendStream* send_stream) {
webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream( webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
const webrtc::AudioReceiveStream::Config& config) { const webrtc::AudioReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream"); TRACE_EVENT0("webrtc", "Call::CreateAudioReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver(); RegisterRateObserver();
event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>( event_log_->Log(std::make_unique<RtcEventAudioReceiveStreamConfig>(
CreateRtcLogStreamConfig(config))); CreateRtcLogStreamConfig(config)));
@ -800,13 +794,11 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
ConfigureSync(config.sync_group); ConfigureSync(config.sync_group);
{
ReadLockScoped read_lock(*send_crit_);
auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc); auto it = audio_send_ssrcs_.find(config.rtp.local_ssrc);
if (it != audio_send_ssrcs_.end()) { if (it != audio_send_ssrcs_.end()) {
receive_stream->AssociateSendStream(it->second); receive_stream->AssociateSendStream(it->second);
} }
}
UpdateAggregateNetworkState(); UpdateAggregateNetworkState();
return receive_stream; return receive_stream;
} }
@ -814,7 +806,7 @@ webrtc::AudioReceiveStream* Call::CreateAudioReceiveStream(
void Call::DestroyAudioReceiveStream( void Call::DestroyAudioReceiveStream(
webrtc::AudioReceiveStream* receive_stream) { webrtc::AudioReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream"); TRACE_EVENT0("webrtc", "Call::DestroyAudioReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr); RTC_DCHECK(receive_stream != nullptr);
webrtc::internal::AudioReceiveStream* audio_receive_stream = webrtc::internal::AudioReceiveStream* audio_receive_stream =
static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream); static_cast<webrtc::internal::AudioReceiveStream*>(receive_stream);
@ -842,7 +834,7 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
VideoEncoderConfig encoder_config, VideoEncoderConfig encoder_config,
std::unique_ptr<FecController> fec_controller) { std::unique_ptr<FecController> fec_controller) {
TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream"); TRACE_EVENT0("webrtc", "Call::CreateVideoSendStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RegisterRateObserver(); RegisterRateObserver();
@ -865,14 +857,12 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_, std::move(config), std::move(encoder_config), suspended_video_send_ssrcs_,
suspended_video_payload_states_, std::move(fec_controller)); suspended_video_payload_states_, std::move(fec_controller));
{
WriteLockScoped write_lock(*send_crit_);
for (uint32_t ssrc : ssrcs) { for (uint32_t ssrc : ssrcs) {
RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end()); RTC_DCHECK(video_send_ssrcs_.find(ssrc) == video_send_ssrcs_.end());
video_send_ssrcs_[ssrc] = send_stream; video_send_ssrcs_[ssrc] = send_stream;
} }
video_send_streams_.insert(send_stream); video_send_streams_.insert(send_stream);
}
UpdateAggregateNetworkState(); UpdateAggregateNetworkState();
return send_stream; return send_stream;
@ -895,13 +885,12 @@ webrtc::VideoSendStream* Call::CreateVideoSendStream(
void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) { void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream"); TRACE_EVENT0("webrtc", "Call::DestroyVideoSendStream");
RTC_DCHECK(send_stream != nullptr); RTC_DCHECK(send_stream != nullptr);
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
send_stream->Stop(); send_stream->Stop();
VideoSendStream* send_stream_impl = nullptr; VideoSendStream* send_stream_impl = nullptr;
{
WriteLockScoped write_lock(*send_crit_);
auto it = video_send_ssrcs_.begin(); auto it = video_send_ssrcs_.begin();
while (it != video_send_ssrcs_.end()) { while (it != video_send_ssrcs_.end()) {
if (it->second == static_cast<VideoSendStream*>(send_stream)) { if (it->second == static_cast<VideoSendStream*>(send_stream)) {
@ -912,7 +901,7 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
} }
} }
video_send_streams_.erase(send_stream_impl); video_send_streams_.erase(send_stream_impl);
}
RTC_CHECK(send_stream_impl != nullptr); RTC_CHECK(send_stream_impl != nullptr);
VideoSendStream::RtpStateMap rtp_states; VideoSendStream::RtpStateMap rtp_states;
@ -933,7 +922,7 @@ void Call::DestroyVideoSendStream(webrtc::VideoSendStream* send_stream) {
webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream( webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
webrtc::VideoReceiveStream::Config configuration) { webrtc::VideoReceiveStream::Config configuration) {
TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream"); TRACE_EVENT0("webrtc", "Call::CreateVideoReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
receive_side_cc_.SetSendPeriodicFeedback( receive_side_cc_.SetSendPeriodicFeedback(
SendPeriodicFeedback(configuration.rtp.extensions)); SendPeriodicFeedback(configuration.rtp.extensions));
@ -970,7 +959,7 @@ webrtc::VideoReceiveStream* Call::CreateVideoReceiveStream(
void Call::DestroyVideoReceiveStream( void Call::DestroyVideoReceiveStream(
webrtc::VideoReceiveStream* receive_stream) { webrtc::VideoReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream"); TRACE_EVENT0("webrtc", "Call::DestroyVideoReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr); RTC_DCHECK(receive_stream != nullptr);
VideoReceiveStream2* receive_stream_impl = VideoReceiveStream2* receive_stream_impl =
static_cast<VideoReceiveStream2*>(receive_stream); static_cast<VideoReceiveStream2*>(receive_stream);
@ -995,7 +984,7 @@ void Call::DestroyVideoReceiveStream(
FlexfecReceiveStream* Call::CreateFlexfecReceiveStream( FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
const FlexfecReceiveStream::Config& config) { const FlexfecReceiveStream::Config& config) {
TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream"); TRACE_EVENT0("webrtc", "Call::CreateFlexfecReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RecoveredPacketReceiver* recovered_packet_receiver = this; RecoveredPacketReceiver* recovered_packet_receiver = this;
@ -1022,7 +1011,7 @@ FlexfecReceiveStream* Call::CreateFlexfecReceiveStream(
void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) { void Call::DestroyFlexfecReceiveStream(FlexfecReceiveStream* receive_stream) {
TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream"); TRACE_EVENT0("webrtc", "Call::DestroyFlexfecReceiveStream");
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RTC_DCHECK(receive_stream != nullptr); RTC_DCHECK(receive_stream != nullptr);
const FlexfecReceiveStream::Config& config = receive_stream->GetConfig(); const FlexfecReceiveStream::Config& config = receive_stream->GetConfig();
@ -1042,7 +1031,7 @@ RtpTransportControllerSendInterface* Call::GetTransportControllerSend() {
} }
Call::Stats Call::GetStats() const { Call::Stats Call::GetStats() const {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
// TODO(tommi): The following stats are managed on the process thread: // TODO(tommi): The following stats are managed on the process thread:
// - pacer_delay_ms (PacedSender::Process) // - pacer_delay_ms (PacedSender::Process)
@ -1066,22 +1055,14 @@ Call::Stats Call::GetStats() const {
receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate( receive_side_cc_.GetRemoteBitrateEstimator(false)->LatestEstimate(
&ssrcs, &recv_bandwidth); &ssrcs, &recv_bandwidth);
stats.recv_bandwidth_bps = recv_bandwidth; stats.recv_bandwidth_bps = recv_bandwidth;
{
rtc::CritScope cs(&last_bandwidth_bps_crit_);
stats.send_bandwidth_bps = last_bandwidth_bps_; stats.send_bandwidth_bps = last_bandwidth_bps_;
}
{
rtc::CritScope cs(&bitrate_crit_);
stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_; stats.max_padding_bitrate_bps = configured_max_padding_bitrate_bps_;
}
return stats; return stats;
} }
void Call::SignalChannelNetworkState(MediaType media, NetworkState state) { void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
switch (media) { switch (media) {
case MediaType::AUDIO: case MediaType::AUDIO:
audio_network_state_ = state; audio_network_state_ = state;
@ -1102,30 +1083,19 @@ void Call::SignalChannelNetworkState(MediaType media, NetworkState state) {
} }
void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) { void Call::OnAudioTransportOverheadChanged(int transport_overhead_per_packet) {
ReadLockScoped read_lock(*send_crit_); RTC_DCHECK_RUN_ON(worker_thread_);
for (auto& kv : audio_send_ssrcs_) { for (auto& kv : audio_send_ssrcs_) {
kv.second->SetTransportOverhead(transport_overhead_per_packet); kv.second->SetTransportOverhead(transport_overhead_per_packet);
} }
} }
void Call::UpdateAggregateNetworkState() { void Call::UpdateAggregateNetworkState() {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
bool have_audio = false; bool have_audio =
bool have_video = false; !audio_send_ssrcs_.empty() || !audio_receive_streams_.empty();
{ bool have_video =
ReadLockScoped read_lock(*send_crit_); !video_send_ssrcs_.empty() || !video_receive_streams_.empty();
if (!audio_send_ssrcs_.empty())
have_audio = true;
if (!video_send_ssrcs_.empty())
have_video = true;
}
if (!audio_receive_streams_.empty())
have_audio = true;
if (!video_receive_streams_.empty())
have_video = true;
bool aggregate_network_up = bool aggregate_network_up =
((have_video && video_network_state_ == kNetworkUp) || ((have_video && video_network_state_ == kNetworkUp) ||
@ -1159,42 +1129,32 @@ void Call::OnStartRateUpdate(DataRate start_rate) {
void Call::OnTargetTransferRate(TargetTransferRate msg) { void Call::OnTargetTransferRate(TargetTransferRate msg) {
RTC_DCHECK(network_queue()->IsCurrent()); RTC_DCHECK(network_queue()->IsCurrent());
RTC_DCHECK_RUN_ON(&network_sequence_checker_); RTC_DCHECK_RUN_ON(&network_sequence_checker_);
{
rtc::CritScope cs(&last_bandwidth_bps_crit_);
last_bandwidth_bps_ = msg.target_rate.bps();
}
uint32_t target_bitrate_bps = msg.target_rate.bps(); uint32_t target_bitrate_bps = msg.target_rate.bps();
// For controlling the rate of feedback messages. // For controlling the rate of feedback messages.
receive_side_cc_.OnBitrateChanged(target_bitrate_bps); receive_side_cc_.OnBitrateChanged(target_bitrate_bps);
bitrate_allocator_->OnNetworkEstimateChanged(msg); bitrate_allocator_->OnNetworkEstimateChanged(msg);
// Ignore updates if bitrate is zero (the aggregate network state is down). worker_thread_->PostTask(
if (target_bitrate_bps == 0) { ToQueuedTask(task_safety_, [this, target_bitrate_bps]() {
rtc::CritScope lock(&bitrate_crit_); RTC_DCHECK_RUN_ON(worker_thread_);
last_bandwidth_bps_ = target_bitrate_bps;
// Ignore updates if bitrate is zero (the aggregate network state is
// down) or if we're not sending video.
if (target_bitrate_bps == 0 || video_send_streams_.empty()) {
estimated_send_bitrate_kbps_counter_.ProcessAndPause(); estimated_send_bitrate_kbps_counter_.ProcessAndPause();
pacer_bitrate_kbps_counter_.ProcessAndPause(); pacer_bitrate_kbps_counter_.ProcessAndPause();
return; return;
} }
bool sending_video;
{
ReadLockScoped read_lock(*send_crit_);
sending_video = !video_send_streams_.empty();
}
rtc::CritScope lock(&bitrate_crit_);
if (!sending_video) {
// Do not update the stats if we are not sending video.
estimated_send_bitrate_kbps_counter_.ProcessAndPause();
pacer_bitrate_kbps_counter_.ProcessAndPause();
return;
}
estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000); estimated_send_bitrate_kbps_counter_.Add(target_bitrate_bps / 1000);
// Pacer bitrate may be higher than bitrate estimate if enforcing min bitrate. // Pacer bitrate may be higher than bitrate estimate if enforcing min
// bitrate.
uint32_t pacer_bitrate_bps = uint32_t pacer_bitrate_bps =
std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_); std::max(target_bitrate_bps, min_allocated_send_bitrate_bps_);
pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000); pacer_bitrate_kbps_counter_.Add(pacer_bitrate_bps / 1000);
}));
} }
void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) { void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
@ -1203,10 +1163,11 @@ void Call::OnAllocationLimitsChanged(BitrateAllocationLimits limits) {
transport_send_ptr_->SetAllocatedSendBitrateLimits(limits); transport_send_ptr_->SetAllocatedSendBitrateLimits(limits);
worker_thread_->PostTask(ToQueuedTask(task_safety_, [this, limits]() {
RTC_DCHECK_RUN_ON(worker_thread_);
min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps(); min_allocated_send_bitrate_bps_ = limits.min_allocatable_rate.bps();
rtc::CritScope lock(&bitrate_crit_);
configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps(); configured_max_padding_bitrate_bps_ = limits.max_padding_rate.bps();
}));
} }
void Call::ConfigureSync(const std::string& sync_group) { void Call::ConfigureSync(const std::string& sync_group) {
@ -1285,14 +1246,12 @@ PacketReceiver::DeliveryStatus Call::DeliverRtcp(MediaType media_type,
} }
} }
if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) { if (media_type == MediaType::ANY || media_type == MediaType::VIDEO) {
ReadLockScoped read_lock(*send_crit_);
for (VideoSendStream* stream : video_send_streams_) { for (VideoSendStream* stream : video_send_streams_) {
stream->DeliverRtcp(packet, length); stream->DeliverRtcp(packet, length);
rtcp_delivered = true; rtcp_delivered = true;
} }
} }
if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) { if (media_type == MediaType::ANY || media_type == MediaType::AUDIO) {
ReadLockScoped read_lock(*send_crit_);
for (auto& kv : audio_send_ssrcs_) { for (auto& kv : audio_send_ssrcs_) {
kv.second->DeliverRtcp(packet, length); kv.second->DeliverRtcp(packet, length);
rtcp_delivered = true; rtcp_delivered = true;
@ -1341,7 +1300,7 @@ PacketReceiver::DeliveryStatus Call::DeliverRtp(MediaType media_type,
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc(); << parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the // Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the |configuration_sequence_checker_|. // RtpDemuxer, is not protected by the |worker_thread_|.
// But deregistering in the |receive_rtp_config_| map is. So by not passing // But deregistering in the |receive_rtp_config_| map is. So by not passing
// the packet on to demuxing in this case, we prevent incoming packets to be // the packet on to demuxing in this case, we prevent incoming packets to be
// passed on via the demuxer to a receive stream which is being torned down. // passed on via the demuxer to a receive stream which is being torned down.
@ -1390,7 +1349,7 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
MediaType media_type, MediaType media_type,
rtc::CopyOnWriteBuffer packet, rtc::CopyOnWriteBuffer packet,
int64_t packet_time_us) { int64_t packet_time_us) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
if (IsRtcp(packet.cdata(), packet.size())) if (IsRtcp(packet.cdata(), packet.size()))
return DeliverRtcp(media_type, packet.cdata(), packet.size()); return DeliverRtcp(media_type, packet.cdata(), packet.size());
@ -1398,7 +1357,7 @@ PacketReceiver::DeliveryStatus Call::DeliverPacket(
} }
void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) { void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&configuration_sequence_checker_); RTC_DCHECK_RUN_ON(worker_thread_);
RtpPacketReceived parsed_packet; RtpPacketReceived parsed_packet;
if (!parsed_packet.Parse(packet, length)) if (!parsed_packet.Parse(packet, length))
return; return;
@ -1410,7 +1369,7 @@ void Call::OnRecoveredPacket(const uint8_t* packet, size_t length) {
RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc " RTC_LOG(LS_ERROR) << "receive_rtp_config_ lookup failed for ssrc "
<< parsed_packet.Ssrc(); << parsed_packet.Ssrc();
// Destruction of the receive stream, including deregistering from the // Destruction of the receive stream, including deregistering from the
// RtpDemuxer, is not protected by the |configuration_sequence_checker_|. // RtpDemuxer, is not protected by the |worker_thread_|.
// But deregistering in the |receive_rtp_config_| map is. // But deregistering in the |receive_rtp_config_| map is.
// So by not passing the packet on to demuxing in this case, we prevent // So by not passing the packet on to demuxing in this case, we prevent
// incoming packets to be passed on via the demuxer to a receive stream // incoming packets to be passed on via the demuxer to a receive stream