Adds ChannelSend specific encoder task queue.
Before this change the encoder tasks runs on a shared worker queue. That makes the destruction require synchronization to avoid races. By keeping a separate encode queue to ChannelSend, we can safely destruct the object without worrying for left over tasks, as they will be stopped when the task queue is destroyed. For TaskQueue implementations using one thread per TaskQueue this will increase the thread count by the number of AudioSendStreams, which typically is just one. This is partly a reland of 9b9344742b186b14d87e827e71a1757f4c94b30e Original change's description: > Removes lock from ChannelSend. > > The lock isn't really needed as encoder_queue_is_active_ can be checked > on the task queue to provide synchronization. > > There is one behavioral change due to this: We will not cancel any currently > pending encoding tasks when we stop sending, they will be allowed to finish. > > Bug: webrtc:10365 > Change-Id: I2b4897dde8d49bc7ee5d2d69694616aee8aaea38 > Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125096 > Reviewed-by: Oskar Sundbom <ossu@webrtc.org> > Commit-Queue: Sebastian Jansson <srte@webrtc.org> > Cr-Commit-Position: refs/heads/master@{#26963} Bug: webrtc:10365 Change-Id: Iafb84e25d90ec8639359be80fad65763d08e5719 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/125740 Reviewed-by: Oskar Sundbom <ossu@webrtc.org> Commit-Queue: Sebastian Jansson <srte@webrtc.org> Cr-Commit-Position: refs/heads/master@{#27038}
This commit is contained in:

committed by
Commit Bot

parent
e01857cca4
commit
44dd9f29c7
@ -85,6 +85,7 @@ AudioSendStream::AudioSendStream(
|
||||
Clock* clock,
|
||||
const webrtc::AudioSendStream::Config& config,
|
||||
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
RtpTransportControllerSendInterface* rtp_transport,
|
||||
BitrateAllocatorInterface* bitrate_allocator,
|
||||
@ -94,13 +95,14 @@ AudioSendStream::AudioSendStream(
|
||||
: AudioSendStream(clock,
|
||||
config,
|
||||
audio_state,
|
||||
task_queue_factory,
|
||||
rtp_transport,
|
||||
bitrate_allocator,
|
||||
event_log,
|
||||
rtcp_rtt_stats,
|
||||
suspended_rtp_state,
|
||||
voe::CreateChannelSend(clock,
|
||||
rtp_transport->GetWorkerQueue(),
|
||||
task_queue_factory,
|
||||
module_process_thread,
|
||||
config.media_transport,
|
||||
/*overhead_observer=*/this,
|
||||
@ -116,6 +118,7 @@ AudioSendStream::AudioSendStream(
|
||||
Clock* clock,
|
||||
const webrtc::AudioSendStream::Config& config,
|
||||
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
RtpTransportControllerSendInterface* rtp_transport,
|
||||
BitrateAllocatorInterface* bitrate_allocator,
|
||||
RtcEventLog* event_log,
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include "rtc_base/constructor_magic.h"
|
||||
#include "rtc_base/experiments/audio_allocation_settings.h"
|
||||
#include "rtc_base/race_checker.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
#include "rtc_base/thread_checker.h"
|
||||
|
||||
namespace webrtc {
|
||||
@ -42,6 +43,7 @@ class AudioSendStream final : public webrtc::AudioSendStream,
|
||||
AudioSendStream(Clock* clock,
|
||||
const webrtc::AudioSendStream::Config& config,
|
||||
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
RtpTransportControllerSendInterface* rtp_transport,
|
||||
BitrateAllocatorInterface* bitrate_allocator,
|
||||
@ -52,6 +54,7 @@ class AudioSendStream final : public webrtc::AudioSendStream,
|
||||
AudioSendStream(Clock* clock,
|
||||
const webrtc::AudioSendStream::Config& config,
|
||||
const rtc::scoped_refptr<webrtc::AudioState>& audio_state,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
RtpTransportControllerSendInterface* rtp_transport,
|
||||
BitrateAllocatorInterface* bitrate_allocator,
|
||||
RtcEventLog* event_log,
|
||||
|
@ -167,8 +167,8 @@ struct ConfigHelper {
|
||||
return std::unique_ptr<internal::AudioSendStream>(
|
||||
new internal::AudioSendStream(
|
||||
Clock::GetRealTimeClock(), stream_config_, audio_state_,
|
||||
&rtp_transport_, &bitrate_allocator_, &event_log_, &rtcp_rtt_stats_,
|
||||
absl::nullopt,
|
||||
&GlobalTaskQueueFactory(), &rtp_transport_, &bitrate_allocator_,
|
||||
&event_log_, &rtcp_rtt_stats_, absl::nullopt,
|
||||
std::unique_ptr<voe::ChannelSendInterface>(channel_send_)));
|
||||
}
|
||||
|
||||
|
@ -31,7 +31,6 @@
|
||||
#include "modules/pacing/packet_router.h"
|
||||
#include "modules/utility/include/process_thread.h"
|
||||
#include "rtc_base/checks.h"
|
||||
#include "rtc_base/critical_section.h"
|
||||
#include "rtc_base/event.h"
|
||||
#include "rtc_base/format_macros.h"
|
||||
#include "rtc_base/location.h"
|
||||
@ -88,7 +87,7 @@ class ChannelSend
|
||||
friend class VoERtcpObserver;
|
||||
|
||||
ChannelSend(Clock* clock,
|
||||
rtc::TaskQueue* encoder_queue,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
MediaTransportInterface* media_transport,
|
||||
OverheadObserver* overhead_observer,
|
||||
@ -181,8 +180,6 @@ class ChannelSend
|
||||
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) override;
|
||||
|
||||
private:
|
||||
class ProcessAndEncodeAudioTask;
|
||||
|
||||
// From AudioPacketizationCallback in the ACM
|
||||
int32_t SendData(AudioFrameType frameType,
|
||||
uint8_t payloadType,
|
||||
@ -200,20 +197,23 @@ class ChannelSend
|
||||
uint8_t payloadType,
|
||||
uint32_t timeStamp,
|
||||
rtc::ArrayView<const uint8_t> payload,
|
||||
const RTPFragmentationHeader* fragmentation);
|
||||
const RTPFragmentationHeader* fragmentation)
|
||||
RTC_RUN_ON(encoder_queue_);
|
||||
|
||||
int32_t SendMediaTransportAudio(AudioFrameType frameType,
|
||||
uint8_t payloadType,
|
||||
uint32_t timeStamp,
|
||||
rtc::ArrayView<const uint8_t> payload,
|
||||
const RTPFragmentationHeader* fragmentation);
|
||||
const RTPFragmentationHeader* fragmentation)
|
||||
RTC_RUN_ON(encoder_queue_);
|
||||
|
||||
// Return media transport or nullptr if using RTP.
|
||||
MediaTransportInterface* media_transport() { return media_transport_; }
|
||||
|
||||
// Called on the encoder task queue when a new input audio frame is ready
|
||||
// for encoding.
|
||||
void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input);
|
||||
void ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input)
|
||||
RTC_RUN_ON(encoder_queue_);
|
||||
|
||||
void OnReceivedRtt(int64_t rtt_ms);
|
||||
|
||||
@ -267,9 +267,7 @@ class ChannelSend
|
||||
|
||||
const bool use_twcc_plr_for_ana_;
|
||||
|
||||
rtc::CriticalSection encoder_queue_lock_;
|
||||
bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_lock_) = false;
|
||||
rtc::TaskQueue* const encoder_queue_ = nullptr;
|
||||
bool encoder_queue_is_active_ RTC_GUARDED_BY(encoder_queue_) = false;
|
||||
|
||||
MediaTransportInterface* const media_transport_;
|
||||
int media_transport_sequence_number_ RTC_GUARDED_BY(encoder_queue_) = 0;
|
||||
@ -286,12 +284,17 @@ class ChannelSend
|
||||
RTC_GUARDED_BY(&media_transport_lock_);
|
||||
|
||||
// E2EE Audio Frame Encryption
|
||||
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_;
|
||||
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor_
|
||||
RTC_GUARDED_BY(encoder_queue_);
|
||||
// E2EE Frame Encryption Options
|
||||
const webrtc::CryptoOptions crypto_options_;
|
||||
|
||||
rtc::CriticalSection bitrate_crit_section_;
|
||||
int configured_bitrate_bps_ RTC_GUARDED_BY(bitrate_crit_section_) = 0;
|
||||
|
||||
// Defined last to ensure that there are no running tasks when the other
|
||||
// members are destroyed.
|
||||
rtc::TaskQueue encoder_queue_;
|
||||
};
|
||||
|
||||
const int kTelephoneEventAttenuationdB = 10;
|
||||
@ -473,32 +476,13 @@ class VoERtcpObserver : public RtcpBandwidthObserver {
|
||||
RtcpBandwidthObserver* bandwidth_observer_ RTC_GUARDED_BY(crit_);
|
||||
};
|
||||
|
||||
class ChannelSend::ProcessAndEncodeAudioTask : public rtc::QueuedTask {
|
||||
public:
|
||||
ProcessAndEncodeAudioTask(std::unique_ptr<AudioFrame> audio_frame,
|
||||
ChannelSend* channel)
|
||||
: audio_frame_(std::move(audio_frame)), channel_(channel) {
|
||||
RTC_DCHECK(channel_);
|
||||
}
|
||||
|
||||
private:
|
||||
bool Run() override {
|
||||
RTC_DCHECK_RUN_ON(channel_->encoder_queue_);
|
||||
channel_->ProcessAndEncodeAudioOnTaskQueue(audio_frame_.get());
|
||||
return true;
|
||||
}
|
||||
|
||||
std::unique_ptr<AudioFrame> audio_frame_;
|
||||
ChannelSend* const channel_;
|
||||
};
|
||||
|
||||
int32_t ChannelSend::SendData(AudioFrameType frameType,
|
||||
uint8_t payloadType,
|
||||
uint32_t timeStamp,
|
||||
const uint8_t* payloadData,
|
||||
size_t payloadSize,
|
||||
const RTPFragmentationHeader* fragmentation) {
|
||||
RTC_DCHECK_RUN_ON(encoder_queue_);
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
rtc::ArrayView<const uint8_t> payload(payloadData, payloadSize);
|
||||
|
||||
if (media_transport() != nullptr) {
|
||||
@ -521,7 +505,6 @@ int32_t ChannelSend::SendRtpAudio(AudioFrameType frameType,
|
||||
uint32_t timeStamp,
|
||||
rtc::ArrayView<const uint8_t> payload,
|
||||
const RTPFragmentationHeader* fragmentation) {
|
||||
RTC_DCHECK_RUN_ON(encoder_queue_);
|
||||
if (_includeAudioLevelIndication) {
|
||||
// Store current audio level in the RTP sender.
|
||||
// The level will be used in combination with voice-activity state
|
||||
@ -594,7 +577,6 @@ int32_t ChannelSend::SendMediaTransportAudio(
|
||||
uint32_t timeStamp,
|
||||
rtc::ArrayView<const uint8_t> payload,
|
||||
const RTPFragmentationHeader* fragmentation) {
|
||||
RTC_DCHECK_RUN_ON(encoder_queue_);
|
||||
// TODO(nisse): Use null _transportPtr for MediaTransport.
|
||||
// RTC_DCHECK(_transportPtr == nullptr);
|
||||
uint64_t channel_id;
|
||||
@ -645,7 +627,7 @@ int32_t ChannelSend::SendMediaTransportAudio(
|
||||
}
|
||||
|
||||
ChannelSend::ChannelSend(Clock* clock,
|
||||
rtc::TaskQueue* encoder_queue,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
MediaTransportInterface* media_transport,
|
||||
OverheadObserver* overhead_observer,
|
||||
@ -671,12 +653,13 @@ ChannelSend::ChannelSend(Clock* clock,
|
||||
new RateLimiter(clock, kMaxRetransmissionWindowMs)),
|
||||
use_twcc_plr_for_ana_(
|
||||
webrtc::field_trial::FindFullName("UseTwccPlrForAna") == "Enabled"),
|
||||
encoder_queue_(encoder_queue),
|
||||
media_transport_(media_transport),
|
||||
frame_encryptor_(frame_encryptor),
|
||||
crypto_options_(crypto_options) {
|
||||
crypto_options_(crypto_options),
|
||||
encoder_queue_(task_queue_factory->CreateTaskQueue(
|
||||
"AudioEncoder",
|
||||
TaskQueueFactory::Priority::NORMAL)) {
|
||||
RTC_DCHECK(module_process_thread);
|
||||
RTC_DCHECK(encoder_queue);
|
||||
module_process_thread_checker_.DetachFromThread();
|
||||
|
||||
audio_coding_.reset(AudioCodingModule::Create(AudioCodingModule::Config()));
|
||||
@ -763,11 +746,11 @@ void ChannelSend::StartSend() {
|
||||
_rtpRtcpModule->SetSendingMediaStatus(true);
|
||||
int ret = _rtpRtcpModule->SetSendingStatus(true);
|
||||
RTC_DCHECK_EQ(0, ret);
|
||||
{
|
||||
// It is now OK to start posting tasks to the encoder task queue.
|
||||
rtc::CritScope cs(&encoder_queue_lock_);
|
||||
// It is now OK to start processing on the encoder task queue.
|
||||
encoder_queue_.PostTask([this] {
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
encoder_queue_is_active_ = true;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void ChannelSend::StopSend() {
|
||||
@ -777,22 +760,12 @@ void ChannelSend::StopSend() {
|
||||
}
|
||||
sending_ = false;
|
||||
|
||||
// Post a task to the encoder thread which sets an event when the task is
|
||||
// executed. We know that no more encoding tasks will be added to the task
|
||||
// queue for this channel since sending is now deactivated. It means that,
|
||||
// if we wait for the event to bet set, we know that no more pending tasks
|
||||
// exists and it is therfore guaranteed that the task queue will never try
|
||||
// to acccess and invalid channel object.
|
||||
RTC_DCHECK(encoder_queue_);
|
||||
|
||||
rtc::Event flush;
|
||||
{
|
||||
// Clear |encoder_queue_is_active_| under lock to prevent any other tasks
|
||||
// than this final "flush task" to be posted on the queue.
|
||||
rtc::CritScope cs(&encoder_queue_lock_);
|
||||
encoder_queue_.PostTask([this, &flush]() {
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
encoder_queue_is_active_ = false;
|
||||
encoder_queue_->PostTask([&flush]() { flush.Set(); });
|
||||
}
|
||||
flush.Set();
|
||||
});
|
||||
flush.Wait(rtc::Event::kForever);
|
||||
|
||||
// Reset sending SSRC and sequence number and triggers direct transmission
|
||||
@ -1115,20 +1088,24 @@ CallSendStatistics ChannelSend::GetRTCPStatistics() const {
|
||||
void ChannelSend::ProcessAndEncodeAudio(
|
||||
std::unique_ptr<AudioFrame> audio_frame) {
|
||||
RTC_DCHECK_RUNS_SERIALIZED(&audio_thread_race_checker_);
|
||||
// Avoid posting any new tasks if sending was already stopped in StopSend().
|
||||
rtc::CritScope cs(&encoder_queue_lock_);
|
||||
if (!encoder_queue_is_active_) {
|
||||
struct ProcessAndEncodeAudio {
|
||||
void operator()() {
|
||||
RTC_DCHECK_RUN_ON(&channel->encoder_queue_);
|
||||
if (!channel->encoder_queue_is_active_) {
|
||||
return;
|
||||
}
|
||||
channel->ProcessAndEncodeAudioOnTaskQueue(audio_frame.get());
|
||||
}
|
||||
std::unique_ptr<AudioFrame> audio_frame;
|
||||
ChannelSend* const channel;
|
||||
};
|
||||
// Profile time between when the audio frame is added to the task queue and
|
||||
// when the task is actually executed.
|
||||
audio_frame->UpdateProfileTimeStamp();
|
||||
encoder_queue_->PostTask(std::unique_ptr<rtc::QueuedTask>(
|
||||
new ProcessAndEncodeAudioTask(std::move(audio_frame), this)));
|
||||
encoder_queue_.PostTask(ProcessAndEncodeAudio{std::move(audio_frame), this});
|
||||
}
|
||||
|
||||
void ChannelSend::ProcessAndEncodeAudioOnTaskQueue(AudioFrame* audio_input) {
|
||||
RTC_DCHECK_RUN_ON(encoder_queue_);
|
||||
RTC_DCHECK_GT(audio_input->samples_per_channel_, 0);
|
||||
RTC_DCHECK_LE(audio_input->num_channels_, 2);
|
||||
|
||||
@ -1233,14 +1210,10 @@ int64_t ChannelSend::GetRTT() const {
|
||||
void ChannelSend::SetFrameEncryptor(
|
||||
rtc::scoped_refptr<FrameEncryptorInterface> frame_encryptor) {
|
||||
RTC_DCHECK_RUN_ON(&worker_thread_checker_);
|
||||
rtc::CritScope cs(&encoder_queue_lock_);
|
||||
if (encoder_queue_is_active_) {
|
||||
encoder_queue_->PostTask([this, frame_encryptor]() mutable {
|
||||
this->frame_encryptor_ = std::move(frame_encryptor);
|
||||
});
|
||||
} else {
|
||||
encoder_queue_.PostTask([this, frame_encryptor]() mutable {
|
||||
RTC_DCHECK_RUN_ON(&encoder_queue_);
|
||||
frame_encryptor_ = std::move(frame_encryptor);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// TODO(sukhanov): Consider moving TargetTransferRate observer to
|
||||
@ -1261,7 +1234,7 @@ void ChannelSend::OnReceivedRtt(int64_t rtt_ms) {
|
||||
|
||||
std::unique_ptr<ChannelSendInterface> CreateChannelSend(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* encoder_queue,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
MediaTransportInterface* media_transport,
|
||||
OverheadObserver* overhead_observer,
|
||||
@ -1273,7 +1246,7 @@ std::unique_ptr<ChannelSendInterface> CreateChannelSend(
|
||||
bool extmap_allow_mixed,
|
||||
int rtcp_report_interval_ms) {
|
||||
return absl::make_unique<ChannelSend>(
|
||||
clock, encoder_queue, module_process_thread, media_transport,
|
||||
clock, task_queue_factory, module_process_thread, media_transport,
|
||||
overhead_observer, rtp_transport, rtcp_rtt_stats, rtc_event_log,
|
||||
frame_encryptor, crypto_options, extmap_allow_mixed,
|
||||
rtcp_report_interval_ms);
|
||||
|
@ -19,10 +19,10 @@
|
||||
#include "api/audio_codecs/audio_encoder.h"
|
||||
#include "api/crypto/crypto_options.h"
|
||||
#include "api/media_transport_interface.h"
|
||||
#include "api/task_queue/task_queue_factory.h"
|
||||
#include "modules/rtp_rtcp/include/rtp_rtcp.h"
|
||||
#include "modules/rtp_rtcp/source/rtp_sender_audio.h"
|
||||
#include "rtc_base/function_view.h"
|
||||
#include "rtc_base/task_queue.h"
|
||||
|
||||
namespace webrtc {
|
||||
|
||||
@ -119,7 +119,7 @@ class ChannelSendInterface {
|
||||
|
||||
std::unique_ptr<ChannelSendInterface> CreateChannelSend(
|
||||
Clock* clock,
|
||||
rtc::TaskQueue* encoder_queue,
|
||||
TaskQueueFactory* task_queue_factory,
|
||||
ProcessThread* module_process_thread,
|
||||
MediaTransportInterface* media_transport,
|
||||
OverheadObserver* overhead_observer,
|
||||
|
@ -131,8 +131,8 @@ TEST(AudioWithMediaTransport, DeliversAudio) {
|
||||
&GlobalTaskQueueFactory());
|
||||
webrtc::internal::AudioSendStream send_stream(
|
||||
Clock::GetRealTimeClock(), send_config, audio_state,
|
||||
send_process_thread.get(), &rtp_transport, &bitrate_allocator,
|
||||
null_event_log.get(),
|
||||
&GlobalTaskQueueFactory(), send_process_thread.get(), &rtp_transport,
|
||||
&bitrate_allocator, null_event_log.get(),
|
||||
/*rtcp_rtt_stats=*/nullptr, absl::optional<RtpState>());
|
||||
|
||||
audio_device->Init(); // Starts thread.
|
||||
|
@ -666,10 +666,11 @@ webrtc::AudioSendStream* Call::CreateAudioSendStream(
|
||||
}
|
||||
}
|
||||
|
||||
AudioSendStream* send_stream = new AudioSendStream(
|
||||
clock_, config, config_.audio_state, module_process_thread_.get(),
|
||||
transport_send_ptr_, bitrate_allocator_.get(), event_log_,
|
||||
call_stats_.get(), suspended_rtp_state);
|
||||
AudioSendStream* send_stream =
|
||||
new AudioSendStream(clock_, config, config_.audio_state,
|
||||
task_queue_factory_, module_process_thread_.get(),
|
||||
transport_send_ptr_, bitrate_allocator_.get(),
|
||||
event_log_, call_stats_.get(), suspended_rtp_state);
|
||||
{
|
||||
WriteLockScoped write_lock(*send_crit_);
|
||||
RTC_DCHECK(audio_send_ssrcs_.find(config.rtp.ssrc) ==
|
||||
|
Reference in New Issue
Block a user