Files
platform-external-webrtc/modules/audio_processing/aec3/message_queue.h
Gustaf Ullberg 74ba99062c AEC3: Lockless transfer of render data to the capture thread
This CL implements a lockless queue that replaces SwapQueue
in the RenderWriter. This avoid stalls when the render and
capture threads are accessing the queue at the same time.

Bug: webrtc:10205
Change-Id: Ie7d6fcf9c80fad957e2a90537658fb730ca2ed72
Reviewed-on: https://webrtc-review.googlesource.com/c/117643
Reviewed-by: Per Åhgren <peah@webrtc.org>
Commit-Queue: Gustaf Ullberg <gustaf@webrtc.org>
Cr-Commit-Position: refs/heads/master@{#26298}
2019-01-17 14:41:18 +00:00

104 lines
3.0 KiB
C++

/*
* Copyright (c) 2019 The WebRTC project authors. All Rights Reserved.
*
* Use of this source code is governed by a BSD-style license
* that can be found in the LICENSE file in the root of the source
* tree. An additional intellectual property rights grant can be found
* in the file PATENTS. All contributing project authors may
* be found in the AUTHORS file in the root of the source tree.
*/
#ifndef MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#define MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_
#include <atomic>
#include <utility>
#include <vector>
#include "rtc_base/checks.h"
#include "rtc_base/thread_checker.h"
namespace webrtc {
// Fixed-size circular queue similar to SwapQueue, but lock-free and no
// QueueItemVerifierFunction.
// The queue is designed for single-producer-single-consumer (accessed by one
// producer thread, calling Insert(), and one consumer thread, calling Remove().
template <typename T>
class MessageQueue {
public:
explicit MessageQueue(size_t size) : num_elements_(0), queue_(size) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
MessageQueue(size_t size, const T& prototype)
: num_elements_(0), queue_(size, prototype) {
producer_thread_checker_.DetachFromThread();
consumer_thread_checker_.DetachFromThread();
}
~MessageQueue() = default;
// Inserts a T at the back of the queue by swapping *input with a T from the
// queue. This function should not be called concurrently. It can however be
// called concurrently with Remove(). Returns true if the item was inserted or
// false if not (the queue was full).
bool Insert(T* input) {
RTC_DCHECK_RUN_ON(&producer_thread_checker_);
RTC_DCHECK(input);
if (num_elements_ == queue_.size()) {
return false;
}
std::swap(*input, queue_[next_write_index_]);
++next_write_index_;
if (next_write_index_ == queue_.size()) {
next_write_index_ = 0;
}
++num_elements_;
RTC_DCHECK_LT(next_write_index_, queue_.size());
return true;
}
// Removes the frontmost T from the queue by swapping it with the T in
// *output. This function should not be called concurrently. It can however be
// called concurrently with Insert(). Returns true if an item could be removed
// or false if not (the queue was empty).
bool Remove(T* output) {
RTC_DCHECK_RUN_ON(&consumer_thread_checker_);
RTC_DCHECK(output);
if (num_elements_ == 0) {
return false;
}
std::swap(*output, queue_[next_read_index_]);
++next_read_index_;
if (next_read_index_ == queue_.size()) {
next_read_index_ = 0;
}
--num_elements_;
RTC_DCHECK_LT(next_read_index_, queue_.size());
return true;
}
private:
uint32_t next_write_index_ = 0;
uint32_t next_read_index_ = 0;
rtc::ThreadChecker producer_thread_checker_;
rtc::ThreadChecker consumer_thread_checker_;
std::atomic<uint32_t> num_elements_;
std::vector<T> queue_;
};
} // namespace webrtc
#endif // MODULES_AUDIO_PROCESSING_AEC3_MESSAGE_QUEUE_H_