diff --git a/logging/rtc_event_log/rtc_event_log_impl.cc b/logging/rtc_event_log/rtc_event_log_impl.cc index 8735cb4782..e1e1aabec0 100644 --- a/logging/rtc_event_log/rtc_event_log_impl.cc +++ b/logging/rtc_event_log/rtc_event_log_impl.cc @@ -114,15 +114,11 @@ bool RtcEventLogImpl::StartLogging(std::unique_ptr output, void RtcEventLogImpl::StopLogging() { RTC_LOG(LS_INFO) << "Stopping WebRTC event log."; - + // TODO(danilchap): Do not block current thread waiting on the task queue. + // It might work for now, for current callers, but disallows caller to share + // threads with the |task_queue_|. rtc::Event output_stopped; StopLogging([&output_stopped]() { output_stopped.Set(); }); - - // By making sure StopLogging() is not executed on a task queue, - // we ensure it's not running on a thread that is shared with |task_queue_|, - // meaning the following Wait() will not block forever. - RTC_DCHECK(TaskQueueBase::Current() == nullptr); - output_stopped.Wait(rtc::Event::kForever); RTC_LOG(LS_INFO) << "WebRTC event log successfully stopped."; diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index d189fb0923..0aee6d1165 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -786,6 +786,7 @@ rtc_library("rtc_base") { ":stringutils", "../api:array_view", "../api:scoped_refptr", + "../api/task_queue", "network:sent_packet", "system:file_wrapper", "system:rtc_export", @@ -1368,6 +1369,8 @@ if (rtc_include_tests) { ":stringutils", ":testclient", "../api:array_view", + "../api/task_queue", + "../api/task_queue:task_queue_test", "../test:fileutils", "../test:test_main", "../test:test_support", diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 20f58b2c81..0b8905e922 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -335,6 +335,7 @@ void* Thread::PreRun(void* pv) { Thread* thread = static_cast(pv); ThreadManager::Instance()->SetCurrentThread(thread); rtc::SetCurrentThreadName(thread->name_.c_str()); + CurrentTaskQueueSetter set_current_task_queue(thread); #if defined(WEBRTC_MAC) ScopedAutoReleasePool pool; #endif @@ -475,6 +476,41 @@ void Thread::InvokeInternal(const Location& posted_from, Send(posted_from, handler); } +void Thread::QueuedTaskHandler::OnMessage(Message* msg) { + RTC_DCHECK(msg); + auto* data = static_cast*>(msg->pdata); + std::unique_ptr task = std::move(data->data()); + // MessageQueue expects handler to own Message::pdata when OnMessage is called + // Since MessageData is no longer needed, delete it. + delete data; + + // QueuedTask interface uses Run return value to communicate who owns the + // task. false means QueuedTask took the ownership. + if (!task->Run()) + task.release(); +} + +void Thread::PostTask(std::unique_ptr task) { + // Though Post takes MessageData by raw pointer (last parameter), it still + // takes it with ownership. + Post(RTC_FROM_HERE, &queued_task_handler_, + /*id=*/0, new ScopedMessageData(std::move(task))); +} + +void Thread::PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) { + // Though PostDelayed takes MessageData by raw pointer (last parameter), + // it still takes it with ownership. + PostDelayed(RTC_FROM_HERE, milliseconds, &queued_task_handler_, + /*id=*/0, + new ScopedMessageData(std::move(task))); +} + +void Thread::Delete() { + Stop(); + delete this; +} + bool Thread::IsProcessingMessagesForTesting() { return (owned_ || IsCurrent()) && MessageQueue::IsProcessingMessagesForTesting(); diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 41052dfb1d..186d7f4c4d 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -21,6 +21,8 @@ #if defined(WEBRTC_POSIX) #include #endif +#include "api/task_queue/queued_task.h" +#include "api/task_queue/task_queue_base.h" #include "rtc_base/constructor_magic.h" #include "rtc_base/location.h" #include "rtc_base/message_handler.h" @@ -133,7 +135,8 @@ struct _SendMessage { // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). -class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue { +class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, + public webrtc::TaskQueueBase { public: explicit Thread(SocketServer* ss); explicit Thread(std::unique_ptr ss); @@ -263,6 +266,12 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue { std::forward(functor))); } + // From TaskQueueBase + void PostTask(std::unique_ptr task) override; + void PostDelayedTask(std::unique_ptr task, + uint32_t milliseconds) override; + void Delete() override; + // From MessageQueue bool IsProcessingMessagesForTesting() override; void Clear(MessageHandler* phandler, @@ -325,6 +334,10 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue { friend class ScopedDisallowBlockingCalls; private: + class QueuedTaskHandler final : public MessageHandler { + public: + void OnMessage(Message* msg) override; + }; // Sets the per-thread allow-blocking-calls flag and returns the previous // value. Must be called on this thread. bool SetAllowBlockingCalls(bool allow); @@ -381,6 +394,9 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue { // Only touched from the worker thread itself. bool blocking_calls_allowed_ = true; + // Runs webrtc::QueuedTask posted to the Thread. + QueuedTaskHandler queued_task_handler_; + friend class ThreadManager; RTC_DISALLOW_COPY_AND_ASSIGN(Thread); diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index 0e5026d112..8147c90a35 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -12,6 +12,8 @@ #include +#include "api/task_queue/task_queue_factory.h" +#include "api/task_queue/task_queue_test.h" #include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" #include "rtc_base/event.h" @@ -900,5 +902,23 @@ TEST(ThreadPostTaskTest, InvokesInPostedOrder) { fourth.Wait(Event::kForever); } +class ThreadFactory : public webrtc::TaskQueueFactory { + public: + std::unique_ptr + CreateTaskQueue(absl::string_view /* name */, + Priority /*priority*/) const override { + std::unique_ptr thread = Thread::Create(); + thread->Start(); + return std::unique_ptr( + thread.release()); + } +}; + +using ::webrtc::TaskQueueTest; + +INSTANTIATE_TEST_SUITE_P(RtcThread, + TaskQueueTest, + ::testing::Values(std::make_unique)); + } // namespace } // namespace rtc