diff --git a/modules/audio_processing/aec3/BUILD.gn b/modules/audio_processing/aec3/BUILD.gn index b34ed226db..189bcfd712 100644 --- a/modules/audio_processing/aec3/BUILD.gn +++ b/modules/audio_processing/aec3/BUILD.gn @@ -75,7 +75,6 @@ rtc_static_library("aec3") { "matched_filter_lag_aggregator.h", "matrix_buffer.cc", "matrix_buffer.h", - "message_queue.h", "moving_average.cc", "moving_average.h", "render_buffer.cc", diff --git a/modules/audio_processing/aec3/echo_canceller3.cc b/modules/audio_processing/aec3/echo_canceller3.cc index fc802b1f96..e3846058f2 100644 --- a/modules/audio_processing/aec3/echo_canceller3.cc +++ b/modules/audio_processing/aec3/echo_canceller3.cc @@ -276,13 +276,13 @@ const int kNumberOfHighPassBiQuads_16kHz = 1; class EchoCanceller3::RenderWriter { public: - RenderWriter( - ApmDataDumper* data_dumper, - MessageQueue>>* render_transfer_queue, - std::unique_ptr render_highpass_filter, - int sample_rate_hz, - int frame_length, - int num_bands); + RenderWriter(ApmDataDumper* data_dumper, + SwapQueue>, + Aec3RenderQueueItemVerifier>* render_transfer_queue, + std::unique_ptr render_highpass_filter, + int sample_rate_hz, + int frame_length, + int num_bands); ~RenderWriter(); void Insert(AudioBuffer* input); @@ -293,13 +293,15 @@ class EchoCanceller3::RenderWriter { const int num_bands_; std::unique_ptr render_highpass_filter_; std::vector> render_queue_input_frame_; - MessageQueue>>* render_transfer_queue_; + SwapQueue>, Aec3RenderQueueItemVerifier>* + render_transfer_queue_; RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RenderWriter); }; EchoCanceller3::RenderWriter::RenderWriter( ApmDataDumper* data_dumper, - MessageQueue>>* render_transfer_queue, + SwapQueue>, Aec3RenderQueueItemVerifier>* + render_transfer_queue, std::unique_ptr render_highpass_filter, int sample_rate_hz, int frame_length, @@ -368,10 +370,12 @@ EchoCanceller3::EchoCanceller3(const EchoCanceller3Config& config, output_framer_(num_bands_), capture_blocker_(num_bands_), render_blocker_(num_bands_), - render_transfer_queue_(kRenderTransferQueueSizeFrames, - std::vector>( - num_bands_, - std::vector(frame_length_, 0.f))), + render_transfer_queue_( + kRenderTransferQueueSizeFrames, + std::vector>( + num_bands_, + std::vector(frame_length_, 0.f)), + Aec3RenderQueueItemVerifier(num_bands_, frame_length_)), block_processor_(std::move(block_processor)), render_queue_output_frame_(num_bands_, std::vector(frame_length_, 0.f)), diff --git a/modules/audio_processing/aec3/echo_canceller3.h b/modules/audio_processing/aec3/echo_canceller3.h index cb3b382628..c1298d207e 100644 --- a/modules/audio_processing/aec3/echo_canceller3.h +++ b/modules/audio_processing/aec3/echo_canceller3.h @@ -24,12 +24,12 @@ #include "modules/audio_processing/aec3/block_processor.h" #include "modules/audio_processing/aec3/cascaded_biquad_filter.h" #include "modules/audio_processing/aec3/frame_blocker.h" -#include "modules/audio_processing/aec3/message_queue.h" #include "modules/audio_processing/audio_buffer.h" #include "modules/audio_processing/logging/apm_data_dumper.h" #include "rtc_base/checks.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/race_checker.h" +#include "rtc_base/swap_queue.h" #include "rtc_base/thread_annotations.h" namespace webrtc { @@ -107,7 +107,7 @@ class EchoCanceller3 : public EchoControl { private: class RenderWriter; - // Empties the render MessageQueue. + // Empties the render SwapQueue. void EmptyRenderQueue(); rtc::RaceChecker capture_race_checker_; @@ -127,7 +127,8 @@ class EchoCanceller3 : public EchoControl { BlockFramer output_framer_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker capture_blocker_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker render_blocker_ RTC_GUARDED_BY(capture_race_checker_); - MessageQueue>> render_transfer_queue_; + SwapQueue>, Aec3RenderQueueItemVerifier> + render_transfer_queue_; std::unique_ptr block_processor_ RTC_GUARDED_BY(capture_race_checker_); std::vector> render_queue_output_frame_ diff --git a/modules/audio_processing/aec3/echo_canceller3_unittest.cc b/modules/audio_processing/aec3/echo_canceller3_unittest.cc index ab2ae04809..3f1e059a0c 100644 --- a/modules/audio_processing/aec3/echo_canceller3_unittest.cc +++ b/modules/audio_processing/aec3/echo_canceller3_unittest.cc @@ -461,7 +461,7 @@ class EchoCanceller3Tester { // This test verifies that the swapqueue is able to handle jitter in the // capture and render API calls. - void RunRenderMessageQueueVerificationTest() { + void RunRenderSwapQueueVerificationTest() { const EchoCanceller3Config config; EchoCanceller3 aec3( config, sample_rate_hz_, false, @@ -502,7 +502,7 @@ class EchoCanceller3Tester { // This test verifies that a buffer overrun in the render swapqueue is // properly reported. - void RunRenderPipelineMessageQueueOverrunReturnValueTest() { + void RunRenderPipelineSwapQueueOverrunReturnValueTest() { EchoCanceller3 aec3(EchoCanceller3Config(), sample_rate_hz_, false); constexpr size_t kRenderTransferQueueSize = 30; @@ -631,18 +631,18 @@ TEST(EchoCanceller3Buffering, RenderBitexactness) { } } -TEST(EchoCanceller3Buffering, RenderMessageQueue) { +TEST(EchoCanceller3Buffering, RenderSwapQueue) { for (auto rate : {8000, 16000}) { SCOPED_TRACE(ProduceDebugText(rate)); - EchoCanceller3Tester(rate).RunRenderMessageQueueVerificationTest(); + EchoCanceller3Tester(rate).RunRenderSwapQueueVerificationTest(); } } -TEST(EchoCanceller3Buffering, RenderMessageQueueOverrunReturnValue) { +TEST(EchoCanceller3Buffering, RenderSwapQueueOverrunReturnValue) { for (auto rate : {8000, 16000, 32000, 48000}) { SCOPED_TRACE(ProduceDebugText(rate)); EchoCanceller3Tester(rate) - .RunRenderPipelineMessageQueueOverrunReturnValueTest(); + .RunRenderPipelineSwapQueueOverrunReturnValueTest(); } } diff --git a/modules/audio_processing/aec3/message_queue.h b/modules/audio_processing/aec3/message_queue.h deleted file mode 100644 index ad07d52c81..0000000000 --- a/modules/audio_processing/aec3/message_queue.h +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2019 The WebRTC project authors. All Rights Reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_ -#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_ - -#include -#include -#include - -#include "rtc_base/checks.h" -#include "rtc_base/thread_checker.h" - -namespace webrtc { - -// Fixed-size circular queue similar to SwapQueue, but lock-free and no -// QueueItemVerifierFunction. -// The queue is designed for single-producer-single-consumer (accessed by one -// producer thread, calling Insert(), and one consumer thread, calling Remove(). -template -class MessageQueue { - public: - explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) { - producer_thread_checker_.DetachFromThread(); - consumer_thread_checker_.DetachFromThread(); - } - MessageQueue(size_t size, const T& prototype) - : num_elements_(0), queue_(size, prototype) { - producer_thread_checker_.DetachFromThread(); - consumer_thread_checker_.DetachFromThread(); - } - ~MessageQueue() = default; - - // Inserts a T at the back of the queue by swapping *input with a T from the - // queue. This function should not be called concurrently. It can however be - // called concurrently with Remove(). Returns true if the item was inserted or - // false if not (the queue was full). - bool Insert(T* input) { - RTC_DCHECK_RUN_ON(&producer_thread_checker_); - RTC_DCHECK(input); - - if (num_elements_ == queue_.size()) { - return false; - } - - std::swap(*input, queue_[next_write_index_]); - - ++next_write_index_; - if (next_write_index_ == queue_.size()) { - next_write_index_ = 0; - } - - ++num_elements_; - - RTC_DCHECK_LT(next_write_index_, queue_.size()); - - return true; - } - - // Removes the frontmost T from the queue by swapping it with the T in - // *output. This function should not be called concurrently. It can however be - // called concurrently with Insert(). Returns true if an item could be removed - // or false if not (the queue was empty). - bool Remove(T* output) { - RTC_DCHECK_RUN_ON(&consumer_thread_checker_); - RTC_DCHECK(output); - - if (num_elements_ == 0) { - return false; - } - - std::swap(*output, queue_[next_read_index_]); - - ++next_read_index_; - if (next_read_index_ == queue_.size()) { - next_read_index_ = 0; - } - - --num_elements_; - - RTC_DCHECK_LT(next_read_index_, queue_.size()); - - return true; - } - - private: - uint32_t next_write_index_ = 0; - uint32_t next_read_index_ = 0; - rtc::ThreadChecker producer_thread_checker_; - rtc::ThreadChecker consumer_thread_checker_; - std::atomic num_elements_; - std::vector queue_; -}; -} // namespace webrtc - -#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_