Revert "AEC3: Lockless transfer of render data to the capture thread"

This reverts commit 74ba99062c48b278675cfe52643719202296fddc.

Reason for revert: Breaks downstream project.

Original change's description:
> AEC3: Lockless transfer of render data to the capture thread
> 
> This CL implements a lockless queue that replaces SwapQueue
> in the RenderWriter. This avoid stalls when the render and
> capture threads are accessing the queue at the same time.
> 
> Bug: webrtc:10205
> Change-Id: Ie7d6fcf9c80fad957e2a90537658fb730ca2ed72
> Reviewed-on: https://webrtc-review.googlesource.com/c/117643
> Reviewed-by: Per Åhgren <peah@webrtc.org>
> Commit-Queue: Gustaf Ullberg <gustaf@webrtc.org>
> Cr-Commit-Position: refs/heads/master@{#26298}

TBR=gustaf@webrtc.org,peah@webrtc.org

Change-Id: Ie76ee8835da4e44982d181a152c9ffa19ff33e23
No-Presubmit: true
No-Tree-Checks: true
No-Try: true
Bug: webrtc:10205
Reviewed-on: https://webrtc-review.googlesource.com/c/118142
Reviewed-by: Mirko Bonadei <mbonadei@webrtc.org>
Commit-Queue: Mirko Bonadei <mbonadei@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26304}
This commit is contained in:
Mirko Bonadei
2019-01-17 20:43:58 +00:00
committed by Commit Bot
parent ea9845f16a
commit f0d9cda950
5 changed files with 27 additions and 126 deletions

View File

@ -75,7 +75,6 @@ rtc_static_library("aec3") {
"matched_filter_lag_aggregator.h", "matched_filter_lag_aggregator.h",
"matrix_buffer.cc", "matrix_buffer.cc",
"matrix_buffer.h", "matrix_buffer.h",
"message_queue.h",
"moving_average.cc", "moving_average.cc",
"moving_average.h", "moving_average.h",
"render_buffer.cc", "render_buffer.cc",

View File

@ -276,13 +276,13 @@ const int kNumberOfHighPassBiQuads_16kHz = 1;
class EchoCanceller3::RenderWriter { class EchoCanceller3::RenderWriter {
public: public:
RenderWriter( RenderWriter(ApmDataDumper* data_dumper,
ApmDataDumper* data_dumper, SwapQueue<std::vector<std::vector<float>>,
MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue, Aec3RenderQueueItemVerifier>* render_transfer_queue,
std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter, std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter,
int sample_rate_hz, int sample_rate_hz,
int frame_length, int frame_length,
int num_bands); int num_bands);
~RenderWriter(); ~RenderWriter();
void Insert(AudioBuffer* input); void Insert(AudioBuffer* input);
@ -293,13 +293,15 @@ class EchoCanceller3::RenderWriter {
const int num_bands_; const int num_bands_;
std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter_; std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter_;
std::vector<std::vector<float>> render_queue_input_frame_; std::vector<std::vector<float>> render_queue_input_frame_;
MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue_; SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>*
render_transfer_queue_;
RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RenderWriter); RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(RenderWriter);
}; };
EchoCanceller3::RenderWriter::RenderWriter( EchoCanceller3::RenderWriter::RenderWriter(
ApmDataDumper* data_dumper, ApmDataDumper* data_dumper,
MessageQueue<std::vector<std::vector<float>>>* render_transfer_queue, SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>*
render_transfer_queue,
std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter, std::unique_ptr<CascadedBiQuadFilter> render_highpass_filter,
int sample_rate_hz, int sample_rate_hz,
int frame_length, int frame_length,
@ -368,10 +370,12 @@ EchoCanceller3::EchoCanceller3(const EchoCanceller3Config& config,
output_framer_(num_bands_), output_framer_(num_bands_),
capture_blocker_(num_bands_), capture_blocker_(num_bands_),
render_blocker_(num_bands_), render_blocker_(num_bands_),
render_transfer_queue_(kRenderTransferQueueSizeFrames, render_transfer_queue_(
std::vector<std::vector<float>>( kRenderTransferQueueSizeFrames,
num_bands_, std::vector<std::vector<float>>(
std::vector<float>(frame_length_, 0.f))), num_bands_,
std::vector<float>(frame_length_, 0.f)),
Aec3RenderQueueItemVerifier(num_bands_, frame_length_)),
block_processor_(std::move(block_processor)), block_processor_(std::move(block_processor)),
render_queue_output_frame_(num_bands_, render_queue_output_frame_(num_bands_,
std::vector<float>(frame_length_, 0.f)), std::vector<float>(frame_length_, 0.f)),

View File

@ -24,12 +24,12 @@
#include "modules/audio_processing/aec3/block_processor.h" #include "modules/audio_processing/aec3/block_processor.h"
#include "modules/audio_processing/aec3/cascaded_biquad_filter.h" #include "modules/audio_processing/aec3/cascaded_biquad_filter.h"
#include "modules/audio_processing/aec3/frame_blocker.h" #include "modules/audio_processing/aec3/frame_blocker.h"
#include "modules/audio_processing/aec3/message_queue.h"
#include "modules/audio_processing/audio_buffer.h" #include "modules/audio_processing/audio_buffer.h"
#include "modules/audio_processing/logging/apm_data_dumper.h" #include "modules/audio_processing/logging/apm_data_dumper.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/constructor_magic.h" #include "rtc_base/constructor_magic.h"
#include "rtc_base/race_checker.h" #include "rtc_base/race_checker.h"
#include "rtc_base/swap_queue.h"
#include "rtc_base/thread_annotations.h" #include "rtc_base/thread_annotations.h"
namespace webrtc { namespace webrtc {
@ -107,7 +107,7 @@ class EchoCanceller3 : public EchoControl {
private: private:
class RenderWriter; class RenderWriter;
// Empties the render MessageQueue. // Empties the render SwapQueue.
void EmptyRenderQueue(); void EmptyRenderQueue();
rtc::RaceChecker capture_race_checker_; rtc::RaceChecker capture_race_checker_;
@ -127,7 +127,8 @@ class EchoCanceller3 : public EchoControl {
BlockFramer output_framer_ RTC_GUARDED_BY(capture_race_checker_); BlockFramer output_framer_ RTC_GUARDED_BY(capture_race_checker_);
FrameBlocker capture_blocker_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker capture_blocker_ RTC_GUARDED_BY(capture_race_checker_);
FrameBlocker render_blocker_ RTC_GUARDED_BY(capture_race_checker_); FrameBlocker render_blocker_ RTC_GUARDED_BY(capture_race_checker_);
MessageQueue<std::vector<std::vector<float>>> render_transfer_queue_; SwapQueue<std::vector<std::vector<float>>, Aec3RenderQueueItemVerifier>
render_transfer_queue_;
std::unique_ptr<BlockProcessor> block_processor_ std::unique_ptr<BlockProcessor> block_processor_
RTC_GUARDED_BY(capture_race_checker_); RTC_GUARDED_BY(capture_race_checker_);
std::vector<std::vector<float>> render_queue_output_frame_ std::vector<std::vector<float>> render_queue_output_frame_

View File

@ -461,7 +461,7 @@ class EchoCanceller3Tester {
// This test verifies that the swapqueue is able to handle jitter in the // This test verifies that the swapqueue is able to handle jitter in the
// capture and render API calls. // capture and render API calls.
void RunRenderMessageQueueVerificationTest() { void RunRenderSwapQueueVerificationTest() {
const EchoCanceller3Config config; const EchoCanceller3Config config;
EchoCanceller3 aec3( EchoCanceller3 aec3(
config, sample_rate_hz_, false, config, sample_rate_hz_, false,
@ -502,7 +502,7 @@ class EchoCanceller3Tester {
// This test verifies that a buffer overrun in the render swapqueue is // This test verifies that a buffer overrun in the render swapqueue is
// properly reported. // properly reported.
void RunRenderPipelineMessageQueueOverrunReturnValueTest() { void RunRenderPipelineSwapQueueOverrunReturnValueTest() {
EchoCanceller3 aec3(EchoCanceller3Config(), sample_rate_hz_, false); EchoCanceller3 aec3(EchoCanceller3Config(), sample_rate_hz_, false);
constexpr size_t kRenderTransferQueueSize = 30; constexpr size_t kRenderTransferQueueSize = 30;
@ -631,18 +631,18 @@ TEST(EchoCanceller3Buffering, RenderBitexactness) {
} }
} }
TEST(EchoCanceller3Buffering, RenderMessageQueue) { TEST(EchoCanceller3Buffering, RenderSwapQueue) {
for (auto rate : {8000, 16000}) { for (auto rate : {8000, 16000}) {
SCOPED_TRACE(ProduceDebugText(rate)); SCOPED_TRACE(ProduceDebugText(rate));
EchoCanceller3Tester(rate).RunRenderMessageQueueVerificationTest(); EchoCanceller3Tester(rate).RunRenderSwapQueueVerificationTest();
} }
} }
TEST(EchoCanceller3Buffering, RenderMessageQueueOverrunReturnValue) { TEST(EchoCanceller3Buffering, RenderSwapQueueOverrunReturnValue) {
for (auto rate : {8000, 16000, 32000, 48000}) { for (auto rate : {8000, 16000, 32000, 48000}) {
SCOPED_TRACE(ProduceDebugText(rate)); SCOPED_TRACE(ProduceDebugText(rate));
EchoCanceller3Tester(rate) EchoCanceller3Tester(rate)
.RunRenderPipelineMessageQueueOverrunReturnValueTest(); .RunRenderPipelineSwapQueueOverrunReturnValueTest();
} }
} }

View File

@ -1,103 +0,0 @@
/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#include <atomic>
#include <utility>
#include <vector>
#include "rtc_base/checks.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
// Fixed-size circular queue similar to SwapQueue, but lock-free and no
// QueueItemVerifierFunction.
// The queue is designed for single-producer-single-consumer (accessed by one
// producer thread, calling Insert(), and one consumer thread, calling Remove().
template <typename T>
class MessageQueue {
public:
explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
MessageQueue(size_t size, const T& prototype)
: num_elements_(0), queue_(size, prototype) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
~MessageQueue() = default;
// Inserts a T at the back of the queue by swapping *input with a T from the
// queue. This function should not be called concurrently. It can however be
// called concurrently with Remove(). Returns true if the item was inserted or
// false if not (the queue was full).
bool Insert(T* input) {
RTC_DCHECK_RUN_ON(&producer_thread_checker_);
RTC_DCHECK(input);
if (num_elements_ == queue_.size()) {
return false;
}
std::swap(*input, queue_[next_write_index_]);
++next_write_index_;
if (next_write_index_ == queue_.size()) {
next_write_index_ = 0;
}
++num_elements_;
RTC_DCHECK_LT(next_write_index_, queue_.size());
return true;
}
// Removes the frontmost T from the queue by swapping it with the T in
// *output. This function should not be called concurrently. It can however be
// called concurrently with Insert(). Returns true if an item could be removed
// or false if not (the queue was empty).
bool Remove(T* output) {
RTC_DCHECK_RUN_ON(&consumer_thread_checker_);
RTC_DCHECK(output);
if (num_elements_ == 0) {
return false;
}
std::swap(*output, queue_[next_read_index_]);
++next_read_index_;
if (next_read_index_ == queue_.size()) {
next_read_index_ = 0;
}
--num_elements_;
RTC_DCHECK_LT(next_read_index_, queue_.size());
return true;
}
private:
uint32_t next_write_index_ = 0;
uint32_t next_read_index_ = 0;
rtc::ThreadChecker producer_thread_checker_;
rtc::ThreadChecker consumer_thread_checker_;
std::atomic<uint32_t> num_elements_;
std::vector<T> queue_;
};
} // namespace webrtc
#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_