Reland "Fix data race for config_ in AudioSendStream"

This is a reland of 51e5c4b0f47926e2586d809e47dc60fe4812b782

It may happen that user will pass config with min bitrate > max bitrate.
In such case we can't generate cached_constraints and will crash before.
The reland will handle this situation gracefully.

Original change's description:
> Fix data race for config_ in AudioSendStream
>
> config_ was written and read on different threads without sync. This CL
> moves config access on worker_thread_ with all other required fields.
> It keeps only bitrate allocator accessed from worker_queue_, because
> it is used from it in other classes and supposed to be single threaded.
>
> Bug: None
> Change-Id: I23ece4dc8b09b41a8c589412bedd36d63b76cbc5
> Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/203267
> Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
> Reviewed-by: Niels Moller <nisse@webrtc.org>
> Reviewed-by: Per Åhgren <peah@webrtc.org>
> Reviewed-by: Harald Alvestrand <hta@webrtc.org>
> Commit-Queue: Artem Titov <titovartem@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#33125}

Bug: None
Change-Id: I274ff15208d69c25fb25a0f1dd0a0e37b72480b0
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/205523
Reviewed-by: Harald Alvestrand <hta@webrtc.org>
Reviewed-by: Danil Chapovalov <danilchap@webrtc.org>
Reviewed-by: Niels Moller <nisse@webrtc.org>
Reviewed-by: Per Åhgren <peah@webrtc.org>
Commit-Queue: Artem Titov <titovartem@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#33162}
This commit is contained in:
Artem Titov
2021-02-03 13:33:28 +01:00
committed by Commit Bot
parent 4ef5638871
commit a208861401
2 changed files with 127 additions and 90 deletions

View File

@ -168,13 +168,14 @@ AudioSendStream::AudioSendStream(
RTC_DCHECK(rtp_rtcp_module_);
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(config, true);
UpdateCachedTargetAudioBitrateConstraints();
pacer_thread_checker_.Detach();
}
AudioSendStream::~AudioSendStream() {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
RTC_LOG(LS_INFO) << "~AudioSendStream: " << config_.rtp.ssrc;
RTC_DCHECK(!sending_);
channel_send_->ResetSenderCongestionControlObjects();
@ -186,13 +187,13 @@ AudioSendStream::~AudioSendStream() {
}
const webrtc::AudioSendStream::Config& AudioSendStream::GetConfig() const {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
return config_;
}
void AudioSendStream::Reconfigure(
const webrtc::AudioSendStream::Config& new_config) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
ConfigureStream(new_config, false);
}
@ -351,20 +352,22 @@ void AudioSendStream::ConfigureStream(
}
channel_send_->CallEncoder([this](AudioEncoder* encoder) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!encoder) {
return;
}
worker_queue_->PostTask(
[this, length_range = encoder->GetFrameLengthRange()] {
RTC_DCHECK_RUN_ON(worker_queue_);
frame_length_range_ = length_range;
});
frame_length_range_ = encoder->GetFrameLengthRange();
UpdateCachedTargetAudioBitrateConstraints();
});
if (sending_) {
ReconfigureBitrateObserver(new_config);
}
config_ = new_config;
if (!first_time) {
UpdateCachedTargetAudioBitrateConstraints();
}
}
void AudioSendStream::Start() {
@ -379,13 +382,7 @@ void AudioSendStream::Start() {
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
rtp_rtcp_module_->SetAsPartOfAllocation(true);
rtc::Event thread_sync_event;
worker_queue_->PostTask([&] {
RTC_DCHECK_RUN_ON(worker_queue_);
ConfigureBitrateObserver();
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
} else {
rtp_rtcp_module_->SetAsPartOfAllocation(false);
}
@ -396,7 +393,7 @@ void AudioSendStream::Start() {
}
void AudioSendStream::Stop() {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
if (!sending_) {
return;
}
@ -431,14 +428,14 @@ bool AudioSendStream::SendTelephoneEvent(int payload_type,
int payload_frequency,
int event,
int duration_ms) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetSendTelephoneEventPayloadType(payload_type,
payload_frequency);
return channel_send_->SendTelephoneEventOutband(event, duration_ms);
}
void AudioSendStream::SetMuted(bool muted) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->SetInputMute(muted);
}
@ -448,7 +445,7 @@ webrtc::AudioSendStream::Stats AudioSendStream::GetStats() const {
webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
bool has_remote_tracks) const {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
webrtc::AudioSendStream::Stats stats;
stats.local_ssrc = config_.rtp.ssrc;
stats.target_bitrate_bps = channel_send_->GetBitrate();
@ -509,12 +506,14 @@ webrtc::AudioSendStream::Stats AudioSendStream::GetStats(
void AudioSendStream::DeliverRtcp(const uint8_t* packet, size_t length) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
channel_send_->ReceivedRTCPPacket(packet, length);
worker_queue_->PostTask([&]() {
{
// Poll if overhead has changed, which it can do if ack triggers us to stop
// sending mid/rid.
MutexLock lock(&overhead_per_packet_lock_);
UpdateOverheadForEncoder();
});
}
UpdateCachedTargetAudioBitrateConstraints();
}
uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
@ -523,9 +522,11 @@ uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
// Pick a target bitrate between the constraints. Overrules the allocator if
// it 1) allocated a bitrate of zero to disable the stream or 2) allocated a
// higher than max to allow for e.g. extra FEC.
auto constraints = GetMinMaxBitrateConstraints();
update.target_bitrate.Clamp(constraints.min, constraints.max);
update.stable_target_bitrate.Clamp(constraints.min, constraints.max);
RTC_DCHECK(cached_constraints_.has_value());
update.target_bitrate.Clamp(cached_constraints_->min,
cached_constraints_->max);
update.stable_target_bitrate.Clamp(cached_constraints_->min,
cached_constraints_->max);
channel_send_->OnBitrateAllocation(update);
@ -536,13 +537,17 @@ uint32_t AudioSendStream::OnBitrateUpdated(BitrateAllocationUpdate update) {
void AudioSendStream::SetTransportOverhead(
int transport_overhead_per_packet_bytes) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
{
MutexLock lock(&overhead_per_packet_lock_);
transport_overhead_per_packet_bytes_ = transport_overhead_per_packet_bytes;
UpdateOverheadForEncoder();
}
UpdateCachedTargetAudioBitrateConstraints();
}
void AudioSendStream::UpdateOverheadForEncoder() {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
size_t overhead_per_packet_bytes = GetPerPacketOverheadBytes();
if (overhead_per_packet_ == overhead_per_packet_bytes) {
return;
@ -552,20 +557,12 @@ void AudioSendStream::UpdateOverheadForEncoder() {
channel_send_->CallEncoder([&](AudioEncoder* encoder) {
encoder->OnReceivedOverhead(overhead_per_packet_bytes);
});
auto update_task = [this, overhead_per_packet_bytes] {
RTC_DCHECK_RUN_ON(worker_queue_);
if (total_packet_overhead_bytes_ != overhead_per_packet_bytes) {
total_packet_overhead_bytes_ = overhead_per_packet_bytes;
if (registered_with_allocator_) {
ConfigureBitrateObserver();
}
}
};
if (worker_queue_->IsCurrent()) {
update_task();
} else {
worker_queue_->PostTask(update_task);
}
}
size_t AudioSendStream::TestOnlyGetPerPacketOverheadBytes() const {
@ -602,7 +599,6 @@ const internal::AudioState* AudioSendStream::audio_state() const {
void AudioSendStream::StoreEncoderProperties(int sample_rate_hz,
size_t num_channels) {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
encoder_sample_rate_hz_ = sample_rate_hz;
encoder_num_channels_ = num_channels;
if (sending_) {
@ -800,7 +796,6 @@ void AudioSendStream::ReconfigureCNG(const Config& new_config) {
void AudioSendStream::ReconfigureBitrateObserver(
const webrtc::AudioSendStream::Config& new_config) {
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
// Since the Config's default is for both of these to be -1, this test will
// allow us to configure the bitrate observer if the new config has bitrate
// limits set, but would only have us call RemoveBitrateObserver if we were
@ -819,20 +814,13 @@ void AudioSendStream::ReconfigureBitrateObserver(
rtp_transport_->AccountForAudioPacketsInPacedSender(true);
if (send_side_bwe_with_overhead_)
rtp_transport_->IncludeOverheadInPacedSender();
rtc::Event thread_sync_event;
worker_queue_->PostTask([&] {
RTC_DCHECK_RUN_ON(worker_queue_);
// We may get a callback immediately as the observer is registered, so
// make
// sure the bitrate limits in config_ are up-to-date.
// make sure the bitrate limits in config_ are up-to-date.
config_.min_bitrate_bps = new_config.min_bitrate_bps;
config_.max_bitrate_bps = new_config.max_bitrate_bps;
config_.bitrate_priority = new_config.bitrate_priority;
ConfigureBitrateObserver();
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
rtp_rtcp_module_->SetAsPartOfAllocation(true);
} else {
rtp_transport_->AccountForAudioPacketsInPacedSender(false);
@ -845,6 +833,7 @@ void AudioSendStream::ConfigureBitrateObserver() {
// This either updates the current observer or adds a new observer.
// TODO(srte): Add overhead compensation here.
auto constraints = GetMinMaxBitrateConstraints();
RTC_DCHECK(constraints.has_value());
DataRate priority_bitrate = allocation_settings_.priority_bitrate;
if (send_side_bwe_with_overhead_) {
@ -866,30 +855,40 @@ void AudioSendStream::ConfigureBitrateObserver() {
if (allocation_settings_.priority_bitrate_raw)
priority_bitrate = *allocation_settings_.priority_bitrate_raw;
worker_queue_->PostTask([this, constraints, priority_bitrate,
config_bitrate_priority = config_.bitrate_priority] {
RTC_DCHECK_RUN_ON(worker_queue_);
bitrate_allocator_->AddObserver(
this,
MediaStreamAllocationConfig{
constraints.min.bps<uint32_t>(), constraints.max.bps<uint32_t>(), 0,
priority_bitrate.bps(), true,
constraints->min.bps<uint32_t>(), constraints->max.bps<uint32_t>(),
0, priority_bitrate.bps(), true,
allocation_settings_.bitrate_priority.value_or(
config_.bitrate_priority)});
config_bitrate_priority)});
});
registered_with_allocator_ = true;
}
void AudioSendStream::RemoveBitrateObserver() {
RTC_DCHECK(worker_thread_checker_.IsCurrent());
registered_with_allocator_ = false;
rtc::Event thread_sync_event;
worker_queue_->PostTask([this, &thread_sync_event] {
RTC_DCHECK_RUN_ON(worker_queue_);
registered_with_allocator_ = false;
bitrate_allocator_->RemoveObserver(this);
thread_sync_event.Set();
});
thread_sync_event.Wait(rtc::Event::kForever);
}
AudioSendStream::TargetAudioBitrateConstraints
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
AudioSendStream::GetMinMaxBitrateConstraints() const {
if (config_.min_bitrate_bps < 0 || config_.max_bitrate_bps < 0) {
RTC_LOG(LS_WARNING) << "Config is invalid: min_bitrate_bps="
<< config_.min_bitrate_bps
<< "; max_bitrate_bps=" << config_.max_bitrate_bps
<< "; both expected greater or equal to 0";
return absl::nullopt;
}
TargetAudioBitrateConstraints constraints{
DataRate::BitsPerSec(config_.min_bitrate_bps),
DataRate::BitsPerSec(config_.max_bitrate_bps)};
@ -902,7 +901,11 @@ AudioSendStream::GetMinMaxBitrateConstraints() const {
RTC_DCHECK_GE(constraints.min, DataRate::Zero());
RTC_DCHECK_GE(constraints.max, DataRate::Zero());
RTC_DCHECK_GE(constraints.max, constraints.min);
if (constraints.max < constraints.min) {
RTC_LOG(LS_WARNING) << "TargetAudioBitrateConstraints::max is less than "
<< "TargetAudioBitrateConstraints::min";
return absl::nullopt;
}
if (send_side_bwe_with_overhead_) {
if (use_legacy_overhead_calculation_) {
// OverheadPerPacket = Ipv4(20B) + UDP(8B) + SRTP(10B) + RTP(12)
@ -913,7 +916,10 @@ AudioSendStream::GetMinMaxBitrateConstraints() const {
constraints.min += kMinOverhead;
constraints.max += kMinOverhead;
} else {
RTC_DCHECK(frame_length_range_);
if (!frame_length_range_.has_value()) {
RTC_LOG(LS_WARNING) << "frame_length_range_ is not set";
return absl::nullopt;
}
const DataSize kOverheadPerPacket =
DataSize::Bytes(total_packet_overhead_bytes_);
constraints.min += kOverheadPerPacket / frame_length_range_->second;
@ -927,5 +933,18 @@ void AudioSendStream::RegisterCngPayloadType(int payload_type,
int clockrate_hz) {
channel_send_->RegisterCngPayloadType(payload_type, clockrate_hz);
}
void AudioSendStream::UpdateCachedTargetAudioBitrateConstraints() {
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
new_constraints = GetMinMaxBitrateConstraints();
if (!new_constraints.has_value()) {
return;
}
worker_queue_->PostTask([this, new_constraints]() {
RTC_DCHECK_RUN_ON(worker_queue_);
cached_constraints_ = new_constraints;
});
}
} // namespace internal
} // namespace webrtc

View File

@ -24,8 +24,8 @@
#include "rtc_base/experiments/struct_parameters_parser.h"
#include "rtc_base/race_checker.h"
#include "rtc_base/synchronization/mutex.h"
#include "rtc_base/synchronization/sequence_checker.h"
#include "rtc_base/task_queue.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
class RtcEventLog;
@ -121,22 +121,29 @@ class AudioSendStream final : public webrtc::AudioSendStream,
internal::AudioState* audio_state();
const internal::AudioState* audio_state() const;
void StoreEncoderProperties(int sample_rate_hz, size_t num_channels);
void StoreEncoderProperties(int sample_rate_hz, size_t num_channels)
RTC_RUN_ON(worker_thread_checker_);
void ConfigureStream(const Config& new_config, bool first_time);
bool SetupSendCodec(const Config& new_config);
bool ReconfigureSendCodec(const Config& new_config);
void ReconfigureANA(const Config& new_config);
void ReconfigureCNG(const Config& new_config);
void ReconfigureBitrateObserver(const Config& new_config);
void ConfigureStream(const Config& new_config, bool first_time)
RTC_RUN_ON(worker_thread_checker_);
bool SetupSendCodec(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
bool ReconfigureSendCodec(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureANA(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureCNG(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ReconfigureBitrateObserver(const Config& new_config)
RTC_RUN_ON(worker_thread_checker_);
void ConfigureBitrateObserver() RTC_RUN_ON(worker_queue_);
void RemoveBitrateObserver();
void ConfigureBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
void RemoveBitrateObserver() RTC_RUN_ON(worker_thread_checker_);
// Returns bitrate constraints, maybe including overhead when enabled by
// field trial.
TargetAudioBitrateConstraints GetMinMaxBitrateConstraints() const
RTC_RUN_ON(worker_queue_);
absl::optional<TargetAudioBitrateConstraints> GetMinMaxBitrateConstraints()
const RTC_RUN_ON(worker_thread_checker_);
// Sets per-packet overhead on encoded (for ANA) based on current known values
// of transport and packetization overheads.
@ -147,11 +154,16 @@ class AudioSendStream final : public webrtc::AudioSendStream,
size_t GetPerPacketOverheadBytes() const
RTC_EXCLUSIVE_LOCKS_REQUIRED(overhead_per_packet_lock_);
void RegisterCngPayloadType(int payload_type, int clockrate_hz);
void RegisterCngPayloadType(int payload_type, int clockrate_hz)
RTC_RUN_ON(worker_thread_checker_);
void UpdateCachedTargetAudioBitrateConstraints()
RTC_RUN_ON(worker_thread_checker_);
Clock* clock_;
rtc::ThreadChecker worker_thread_checker_;
rtc::ThreadChecker pacer_thread_checker_;
SequenceChecker worker_thread_checker_;
SequenceChecker pacer_thread_checker_;
rtc::RaceChecker audio_capture_race_checker_;
rtc::TaskQueue* worker_queue_;
@ -161,15 +173,16 @@ class AudioSendStream final : public webrtc::AudioSendStream,
const bool send_side_bwe_with_overhead_;
const AudioAllocationConfig allocation_settings_;
webrtc::AudioSendStream::Config config_;
webrtc::AudioSendStream::Config config_
RTC_GUARDED_BY(worker_thread_checker_);
rtc::scoped_refptr<webrtc::AudioState> audio_state_;
const std::unique_ptr<voe::ChannelSendInterface> channel_send_;
RtcEventLog* const event_log_;
const bool use_legacy_overhead_calculation_;
int encoder_sample_rate_hz_ = 0;
size_t encoder_num_channels_ = 0;
bool sending_ = false;
int encoder_sample_rate_hz_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
size_t encoder_num_channels_ RTC_GUARDED_BY(worker_thread_checker_) = 0;
bool sending_ RTC_GUARDED_BY(worker_thread_checker_) = false;
mutable Mutex audio_level_lock_;
// Keeps track of audio level, total audio energy and total samples duration.
// https://w3c.github.io/webrtc-stats/#dom-rtcaudiohandlerstats-totalaudioenergy
@ -177,6 +190,9 @@ class AudioSendStream final : public webrtc::AudioSendStream,
BitrateAllocatorInterface* const bitrate_allocator_
RTC_GUARDED_BY(worker_queue_);
// Constrains cached to be accessed from |worker_queue_|.
absl::optional<AudioSendStream::TargetAudioBitrateConstraints>
cached_constraints_ RTC_GUARDED_BY(worker_queue_) = absl::nullopt;
RtpTransportControllerSendInterface* const rtp_transport_;
RtpRtcpInterface* const rtp_rtcp_module_;
@ -205,10 +221,12 @@ class AudioSendStream final : public webrtc::AudioSendStream,
size_t transport_overhead_per_packet_bytes_
RTC_GUARDED_BY(overhead_per_packet_lock_) = 0;
bool registered_with_allocator_ RTC_GUARDED_BY(worker_queue_) = false;
size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_queue_) = 0;
bool registered_with_allocator_ RTC_GUARDED_BY(worker_thread_checker_) =
false;
size_t total_packet_overhead_bytes_ RTC_GUARDED_BY(worker_thread_checker_) =
0;
absl::optional<std::pair<TimeDelta, TimeDelta>> frame_length_range_
RTC_GUARDED_BY(worker_queue_);
RTC_GUARDED_BY(worker_thread_checker_);
};
} // namespace internal
} // namespace webrtc