From 47cf5eaca4d2172fceda698899652fd309bc9b9b Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Tue, 19 Feb 2019 20:20:16 +0100 Subject: [PATCH] Migrate gcd task queue implementation to TaskQueueBase interface Bug: webrtc:10191 Change-Id: If15138f97445484668d3e42f3a35875521c38545 Reviewed-on: https://webrtc-review.googlesource.com/c/122501 Reviewed-by: Karl Wiberg Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/master@{#26782} --- DEPS | 1 + abseil-in-webrtc.md | 1 + api/task_queue/BUILD.gn | 6 + .../default_task_queue_factory_gcd.cc | 21 ++ api/task_queue/task_queue_base.cc | 44 ++++ rtc_base/BUILD.gn | 19 +- rtc_base/task_queue_gcd.cc | 191 ++++++++---------- rtc_base/task_queue_gcd.h | 24 +++ rtc_base/task_queue_posix.cc | 40 ---- rtc_base/task_queue_posix.h | 36 ---- 10 files changed, 185 insertions(+), 198 deletions(-) create mode 100644 api/task_queue/default_task_queue_factory_gcd.cc create mode 100644 rtc_base/task_queue_gcd.h delete mode 100644 rtc_base/task_queue_posix.cc delete mode 100644 rtc_base/task_queue_posix.h diff --git a/DEPS b/DEPS index f414d24e8a..968f81cce8 100644 --- a/DEPS +++ b/DEPS @@ -1449,6 +1449,7 @@ include_rules = [ "+absl/algorithm/algorithm.h", "+absl/algorithm/container.h", "+absl/base/attributes.h", + "+absl/base/config.h", "+absl/container/inlined_vector.h", "+absl/memory/memory.h", "+absl/meta/type_traits.h", diff --git a/abseil-in-webrtc.md b/abseil-in-webrtc.md index ba317c43ed..7739cfa120 100644 --- a/abseil-in-webrtc.md +++ b/abseil-in-webrtc.md @@ -23,6 +23,7 @@ adds the first use. * `absl::variant` and related stuff from `absl/types/variant.h`. * The functions in `absl/algorithm/algorithm.h` and `absl/algorithm/container.h` +* The macros in `absl/base/attributes.h` and `absl/base/config.h` ## **Disallowed** diff --git a/api/task_queue/BUILD.gn b/api/task_queue/BUILD.gn index ebb7e39dec..59aed762d5 100644 --- a/api/task_queue/BUILD.gn +++ b/api/task_queue/BUILD.gn @@ -34,6 +34,7 @@ rtc_source_set("task_queue_factory") { ":task_queue", "../../rtc_base:checks", "../../rtc_base:rtc_task_queue_api", + "//third_party/abseil-cpp/absl/base:config", "//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/strings", ] @@ -93,6 +94,11 @@ rtc_source_set("default_task_queue_factory_impl") { "default_task_queue_factory_libevent.cc", ] deps += [ "../../rtc_base:rtc_task_queue_libevent" ] + } else if (is_mac || is_ios) { + sources = [ + "default_task_queue_factory_gcd.cc", + ] + deps += [ "../../rtc_base:rtc_task_queue_gcd" ] } else { sources = [ "default_task_queue_factory_unimplemented.cc", diff --git a/api/task_queue/default_task_queue_factory_gcd.cc b/api/task_queue/default_task_queue_factory_gcd.cc new file mode 100644 index 0000000000..7e17b4846d --- /dev/null +++ b/api/task_queue/default_task_queue_factory_gcd.cc @@ -0,0 +1,21 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#include + +#include "api/task_queue/task_queue_factory.h" +#include "rtc_base/task_queue_gcd.h" + +namespace webrtc { + +std::unique_ptr CreateDefaultTaskQueueFactory() { + return CreateTaskQueueGcdFactory(); +} + +} // namespace webrtc diff --git a/api/task_queue/task_queue_base.cc b/api/task_queue/task_queue_base.cc index 409eb49bf5..7d3539a63d 100644 --- a/api/task_queue/task_queue_base.cc +++ b/api/task_queue/task_queue_base.cc @@ -10,6 +10,10 @@ #include "api/task_queue/task_queue_base.h" #include "absl/base/attributes.h" +#include "absl/base/config.h" +#include "rtc_base/checks.h" + +#if defined(ABSL_HAVE_THREAD_LOCAL) namespace webrtc { namespace { @@ -31,5 +35,45 @@ TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter( TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() { current = previous_; } +} // namespace webrtc + +#elif defined(WEBRTC_POSIX) + +#include + +namespace webrtc { +namespace { + +ABSL_CONST_INIT pthread_key_t g_queue_ptr_tls = 0; + +void InitializeTls() { + RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0); +} + +pthread_key_t GetQueuePtrTls() { + static pthread_once_t init_once = PTHREAD_ONCE_INIT; + RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0); + return g_queue_ptr_tls; +} + +} // namespace + +TaskQueueBase* TaskQueueBase::Current() { + return static_cast(pthread_getspecific(GetQueuePtrTls())); +} + +TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter( + TaskQueueBase* task_queue) + : previous_(TaskQueueBase::Current()) { + pthread_setspecific(GetQueuePtrTls(), task_queue); +} + +TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() { + pthread_setspecific(GetQueuePtrTls(), previous_); +} } // namespace webrtc + +#else +#error Unsupported platform +#endif diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 9809a8ed5f..ff8af81a68 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -534,18 +534,18 @@ if (rtc_enable_libevent) { if (is_mac || is_ios) { rtc_source_set("rtc_task_queue_gcd") { - visibility = [ ":rtc_task_queue_impl" ] + visibility = [ "../api/task_queue:default_task_queue_factory_impl" ] sources = [ "task_queue_gcd.cc", - "task_queue_posix.cc", - "task_queue_posix.h", + "task_queue_gcd.h", ] deps = [ ":checks", ":logging", - ":refcount", - ":rtc_task_queue_api", - "../api:scoped_refptr", + "../api/task_queue", + "../api/task_queue:task_queue_factory", + "//third_party/abseil-cpp/absl/memory", + "//third_party/abseil-cpp/absl/strings", ] } } @@ -594,17 +594,12 @@ rtc_source_set("rtc_task_queue_stdlib") { rtc_source_set("rtc_task_queue_impl") { visibility = [ "*" ] - if (rtc_enable_libevent) { + if (rtc_enable_libevent || is_mac || is_ios) { deps = [ "../api/task_queue:default_task_queue_factory_impl", "../api/task_queue:global_task_queue_factory", ] } else { - if (is_mac || is_ios) { - deps = [ - ":rtc_task_queue_gcd", - ] - } if (is_win) { if (current_os == "winuwp") { deps = [ diff --git a/rtc_base/task_queue_gcd.cc b/rtc_base/task_queue_gcd.cc index 405edab403..c131d82c57 100644 --- a/rtc_base/task_queue_gcd.cc +++ b/rtc_base/task_queue_gcd.cc @@ -12,171 +12,142 @@ // The implementation uses Grand Central Dispatch queues (GCD) to // do the actual task queuing. -#include "rtc_base/task_queue.h" +#include "rtc_base/task_queue_gcd.h" #include #include +#include "absl/memory/memory.h" +#include "absl/strings/string_view.h" +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "rtc_base/checks.h" #include "rtc_base/logging.h" -#include "rtc_base/ref_count.h" -#include "rtc_base/ref_counted_object.h" -#include "rtc_base/task_queue_posix.h" -namespace rtc { +namespace webrtc { namespace { -using Priority = TaskQueue::Priority; - -int TaskQueuePriorityToGCD(Priority priority) { +int TaskQueuePriorityToGCD(TaskQueueFactory::Priority priority) { switch (priority) { - case Priority::NORMAL: + case TaskQueueFactory::Priority::NORMAL: return DISPATCH_QUEUE_PRIORITY_DEFAULT; - case Priority::HIGH: + case TaskQueueFactory::Priority::HIGH: return DISPATCH_QUEUE_PRIORITY_HIGH; - case Priority::LOW: + case TaskQueueFactory::Priority::LOW: return DISPATCH_QUEUE_PRIORITY_LOW; } } -} // namespace -using internal::GetQueuePtrTls; -using internal::AutoSetCurrentQueuePtr; - -class TaskQueue::Impl : public RefCountInterface { +class TaskQueueGcd : public TaskQueueBase { public: - Impl(const char* queue_name, TaskQueue* task_queue, Priority priority); - ~Impl() override; + TaskQueueGcd(absl::string_view queue_name, int gcd_priority); - static TaskQueue* Current(); - - // Used for DCHECKing the current queue. - bool IsCurrent() const; - - void PostTask(std::unique_ptr task); - void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds); + void Delete() override; + void PostTask(std::unique_ptr task) override; + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override; private: - struct QueueContext { - explicit QueueContext(TaskQueue* q) : queue(q), is_active(true) {} - - static void SetNotActive(void* context) { - QueueContext* qc = static_cast(context); - qc->is_active = false; - } - - static void DeleteContext(void* context) { - QueueContext* qc = static_cast(context); - delete qc; - } - - TaskQueue* const queue; - bool is_active; - }; - struct TaskContext { - TaskContext(QueueContext* queue_ctx, std::unique_ptr task) - : queue_ctx(queue_ctx), task(std::move(task)) {} - virtual ~TaskContext() {} + TaskContext(TaskQueueGcd* queue, std::unique_ptr task) + : queue(queue), task(std::move(task)) {} - static void RunTask(void* context) { - std::unique_ptr tc(static_cast(context)); - if (tc->queue_ctx->is_active) { - AutoSetCurrentQueuePtr set_current(tc->queue_ctx->queue); - if (!tc->task->Run()) - tc->task.release(); - } - } - - QueueContext* const queue_ctx; + TaskQueueGcd* const queue; std::unique_ptr task; }; + ~TaskQueueGcd() override; + static void RunTask(void* task_context); + static void SetNotActive(void* task_queue); + static void DeleteQueue(void* task_queue); + dispatch_queue_t queue_; - QueueContext* const context_; + bool is_active_; }; -TaskQueue::Impl::Impl(const char* queue_name, - TaskQueue* task_queue, - Priority priority) - : queue_(dispatch_queue_create(queue_name, DISPATCH_QUEUE_SERIAL)), - context_(new QueueContext(task_queue)) { - RTC_DCHECK(queue_name); +TaskQueueGcd::TaskQueueGcd(absl::string_view queue_name, int gcd_priority) + : queue_(dispatch_queue_create(std::string(queue_name).c_str(), + DISPATCH_QUEUE_SERIAL)), + is_active_(true) { RTC_CHECK(queue_); - dispatch_set_context(queue_, context_); - // Assign a finalizer that will delete the context when the last reference - // to the queue is released. This may run after the TaskQueue object has - // been deleted. - dispatch_set_finalizer_f(queue_, &QueueContext::DeleteContext); + dispatch_set_context(queue_, this); + // Assign a finalizer that will delete the queue when the last reference + // is released. This may run after the TaskQueue::Delete. + dispatch_set_finalizer_f(queue_, &DeleteQueue); - dispatch_set_target_queue( - queue_, dispatch_get_global_queue(TaskQueuePriorityToGCD(priority), 0)); + dispatch_set_target_queue(queue_, dispatch_get_global_queue(gcd_priority, 0)); } -TaskQueue::Impl::~Impl() { +TaskQueueGcd::~TaskQueueGcd() = default; + +void TaskQueueGcd::Delete() { RTC_DCHECK(!IsCurrent()); // Implementation/behavioral note: // Dispatch queues are reference counted via calls to dispatch_retain and // dispatch_release. Pending blocks submitted to a queue also hold a // reference to the queue until they have finished. Once all references to a // queue have been released, the queue will be deallocated by the system. - // This is why we check the context before running tasks. + // This is why we check the is_active_ before running tasks. - // Use dispatch_sync to set the context to null to guarantee that there's not - // a race between checking the context and using it from a task. - dispatch_sync_f(queue_, context_, &QueueContext::SetNotActive); + // Use dispatch_sync to set the is_active_ to guarantee that there's not a + // race with checking it from a task. + dispatch_sync_f(queue_, this, &SetNotActive); dispatch_release(queue_); } -// static -TaskQueue* TaskQueue::Impl::Current() { - return static_cast(pthread_getspecific(GetQueuePtrTls())); +void TaskQueueGcd::PostTask(std::unique_ptr task) { + auto* context = new TaskContext(this, std::move(task)); + dispatch_async_f(queue_, context, &RunTask); } -bool TaskQueue::Impl::IsCurrent() const { - RTC_DCHECK(queue_); - const TaskQueue* current = Current(); - return current && this == current->impl_.get(); -} - -void TaskQueue::Impl::PostTask(std::unique_ptr task) { - auto* context = new TaskContext(context_, std::move(task)); - dispatch_async_f(queue_, context, &TaskContext::RunTask); -} - -void TaskQueue::Impl::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - auto* context = new TaskContext(context_, std::move(task)); +void TaskQueueGcd::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + auto* context = new TaskContext(this, std::move(task)); dispatch_after_f( dispatch_time(DISPATCH_TIME_NOW, milliseconds * NSEC_PER_MSEC), queue_, - context, &TaskContext::RunTask); + context, &RunTask); } -// Boilerplate for the PIMPL pattern. -TaskQueue::TaskQueue(const char* queue_name, Priority priority) - : impl_(new RefCountedObject(queue_name, this, priority)) { -} - -TaskQueue::~TaskQueue() {} - // static -TaskQueue* TaskQueue::Current() { - return TaskQueue::Impl::Current(); +void TaskQueueGcd::RunTask(void* task_context) { + std::unique_ptr tc(static_cast(task_context)); + if (!tc->queue->is_active_) + return; + + CurrentTaskQueueSetter set_current(tc->queue); + auto* task = tc->task.release(); + if (task->Run()) { + // Delete the task before CurrentTaskQueueSetter clears state that this code + // is running on the task queue. + delete task; + } } -// Used for DCHECKing the current queue. -bool TaskQueue::IsCurrent() const { - return impl_->IsCurrent(); +// static +void TaskQueueGcd::SetNotActive(void* task_queue) { + static_cast(task_queue)->is_active_ = false; } -void TaskQueue::PostTask(std::unique_ptr task) { - return TaskQueue::impl_->PostTask(std::move(task)); +// static +void TaskQueueGcd::DeleteQueue(void* task_queue) { + delete static_cast(task_queue); } -void TaskQueue::PostDelayedTask(std::unique_ptr task, - uint32_t milliseconds) { - return TaskQueue::impl_->PostDelayedTask(std::move(task), milliseconds); +class TaskQueueGcdFactory final : public TaskQueueFactory { + public: + std::unique_ptr CreateTaskQueue( + absl::string_view name, + Priority priority) const override { + return std::unique_ptr( + new TaskQueueGcd(name, TaskQueuePriorityToGCD(priority))); + } +}; + +} // namespace + +std::unique_ptr CreateTaskQueueGcdFactory() { + return absl::make_unique(); } -} // namespace rtc +} // namespace webrtc diff --git a/rtc_base/task_queue_gcd.h b/rtc_base/task_queue_gcd.h new file mode 100644 index 0000000000..dc6039e99a --- /dev/null +++ b/rtc_base/task_queue_gcd.h @@ -0,0 +1,24 @@ +/* + * Copyright 2019 The WebRTC Project Authors. All rights reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef RTC_BASE_TASK_QUEUE_GCD_H_ +#define RTC_BASE_TASK_QUEUE_GCD_H_ + +#include + +#include "api/task_queue/task_queue_factory.h" + +namespace webrtc { + +std::unique_ptr CreateTaskQueueGcdFactory(); + +} // namespace webrtc + +#endif // RTC_BASE_TASK_QUEUE_GCD_H_ diff --git a/rtc_base/task_queue_posix.cc b/rtc_base/task_queue_posix.cc deleted file mode 100644 index 520b8e9509..0000000000 --- a/rtc_base/task_queue_posix.cc +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#include "rtc_base/task_queue_posix.h" - -#include "rtc_base/checks.h" -#include "rtc_base/task_queue.h" - -namespace rtc { -namespace internal { -pthread_key_t g_queue_ptr_tls = 0; - -void InitializeTls() { - RTC_CHECK(pthread_key_create(&g_queue_ptr_tls, nullptr) == 0); -} - -pthread_key_t GetQueuePtrTls() { - static pthread_once_t init_once = PTHREAD_ONCE_INIT; - RTC_CHECK(pthread_once(&init_once, &InitializeTls) == 0); - return g_queue_ptr_tls; -} - -AutoSetCurrentQueuePtr::AutoSetCurrentQueuePtr(TaskQueue* q) - : prev_(TaskQueue::Current()) { - pthread_setspecific(GetQueuePtrTls(), q); -} - -AutoSetCurrentQueuePtr::~AutoSetCurrentQueuePtr() { - pthread_setspecific(GetQueuePtrTls(), prev_); -} - -} // namespace internal -} // namespace rtc diff --git a/rtc_base/task_queue_posix.h b/rtc_base/task_queue_posix.h deleted file mode 100644 index 3014e2011d..0000000000 --- a/rtc_base/task_queue_posix.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2016 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ - -#ifndef RTC_BASE_TASK_QUEUE_POSIX_H_ -#define RTC_BASE_TASK_QUEUE_POSIX_H_ - -#include - -namespace rtc { - -class TaskQueue; - -namespace internal { - -class AutoSetCurrentQueuePtr { - public: - explicit AutoSetCurrentQueuePtr(TaskQueue* q); - ~AutoSetCurrentQueuePtr(); - - private: - TaskQueue* const prev_; -}; - -pthread_key_t GetQueuePtrTls(); - -} // namespace internal -} // namespace rtc - -#endif // RTC_BASE_TASK_QUEUE_POSIX_H_