diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index 73446952b2..e259fc872c 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -914,6 +914,8 @@ rtc_library("threading") { absl_deps = [ "//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/base:core_headers", + "//third_party/abseil-cpp/absl/cleanup", + "//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/strings", ] deps = [ @@ -943,6 +945,7 @@ rtc_library("threading") { "../api/task_queue", "../api/task_queue:pending_task_safety_flag", "../api/task_queue:to_queued_task", + "../api/units:time_delta", "synchronization:mutex", "system:no_unique_address", "system:rtc_export", @@ -1730,6 +1733,7 @@ if (rtc_include_tests) { "../api/task_queue:pending_task_safety_flag", "../api/task_queue:task_queue_test", "../api/task_queue:to_queued_task", + "../api/units:time_delta", "../test:field_trial", "../test:fileutils", "../test:rtc_expect_death", diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index e1db2bc24f..ef165ffca8 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -31,8 +31,8 @@ #include #include "absl/algorithm/container.h" +#include "absl/cleanup/cleanup.h" #include "api/sequence_checker.h" -#include "api/task_queue/to_queued_task.h" #include "rtc_base/checks.h" #include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/event.h" @@ -72,22 +72,25 @@ class ScopedAutoReleasePool { namespace rtc { namespace { -class MessageHandlerWithTask final : public MessageHandler { +struct AnyInvocableMessage final : public MessageData { + explicit AnyInvocableMessage(absl::AnyInvocable task) + : task(std::move(task)) {} + absl::AnyInvocable task; +}; + +class AnyInvocableMessageHandler final : public MessageHandler { public: - MessageHandlerWithTask() {} - - MessageHandlerWithTask(const MessageHandlerWithTask&) = delete; - MessageHandlerWithTask& operator=(const MessageHandlerWithTask&) = delete; - void OnMessage(Message* msg) override { - static_cast(msg->pdata)->Run(); + std::move(static_cast(msg->pdata)->task)(); delete msg->pdata; } - - private: - ~MessageHandlerWithTask() override {} }; +MessageHandler* GetAnyInvocableMessageHandler() { + static MessageHandler* const handler = new AnyInvocableMessageHandler; + return handler; +} + class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { public: MarkProcessingCritScope(const RecursiveCriticalSection* cs, @@ -761,8 +764,7 @@ bool Thread::SetName(absl::string_view name, const void* obj) { void Thread::SetDispatchWarningMs(int deadline) { if (!IsCurrent()) { - PostTask(webrtc::ToQueuedTask( - [this, deadline]() { SetDispatchWarningMs(deadline); })); + PostTask([this, deadline]() { SetDispatchWarningMs(deadline); }); return; } RTC_DCHECK_RUN_ON(this); @@ -948,18 +950,19 @@ void Thread::Send(const Location& posted_from, done_event.reset(new rtc::Event()); bool ready = false; - PostTask(webrtc::ToQueuedTask( - [&msg]() mutable { msg.phandler->OnMessage(&msg); }, - [this, &ready, current_thread, done = done_event.get()] { - if (current_thread) { - CritScope cs(&crit_); - ready = true; - current_thread->socketserver()->WakeUp(); - } else { - done->Set(); - } - })); - + absl::Cleanup cleanup = [this, &ready, current_thread, + done = done_event.get()] { + if (current_thread) { + CritScope cs(&crit_); + ready = true; + current_thread->socketserver()->WakeUp(); + } else { + done->Set(); + } + }; + PostTask([&msg, cleanup = std::move(cleanup)]() mutable { + msg.phandler->OnMessage(&msg); + }); if (current_thread) { bool waited = false; crit_.Enter(); @@ -1115,6 +1118,28 @@ void Thread::Delete() { delete this; } +void Thread::PostTask(absl::AnyInvocable task) { + // Though Post takes MessageData by raw pointer (last parameter), it still + // takes it with ownership. + Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(), + /*id=*/0, new AnyInvocableMessage(std::move(task))); +} + +void Thread::PostDelayedTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) { + // This implementation does not support low precision yet. + PostDelayedHighPrecisionTask(std::move(task), delay); +} + +void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) { + int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms(); + // Though PostDelayed takes MessageData by raw pointer (last parameter), + // it still takes it with ownership. + PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(), + /*id=*/0, new AnyInvocableMessage(std::move(task))); +} + bool Thread::IsProcessingMessagesForTesting() { return (owned_ || IsCurrent()) && !IsQuitting(); } @@ -1183,13 +1208,6 @@ bool Thread::IsRunning() { #endif } -// static -MessageHandler* Thread::GetPostTaskMessageHandler() { - // Allocate at first call, never deallocate. - static MessageHandler* handler = new MessageHandlerWithTask; - return handler; -} - AutoThread::AutoThread() : Thread(CreateDefaultSocketServer(), /*do_init=*/false) { if (!ThreadManager::Instance()->CurrentThread()) { diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 33646843dd..e87248c01f 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -28,10 +28,12 @@ #include #endif #include "absl/base/attributes.h" +#include "absl/functional/any_invocable.h" #include "api/function_view.h" #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "api/task_queue/to_queued_task.h" +#include "api/units/time_delta.h" #include "rtc_base/checks.h" #include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/location.h" @@ -79,32 +81,6 @@ namespace rtc { class Thread; -namespace rtc_thread_internal { - -class MessageLikeTask : public MessageData { - public: - virtual void Run() = 0; -}; - -template -class MessageWithFunctor final : public MessageLikeTask { - public: - explicit MessageWithFunctor(FunctorT&& functor) - : functor_(std::forward(functor)) {} - - MessageWithFunctor(const MessageWithFunctor&) = delete; - MessageWithFunctor& operator=(const MessageWithFunctor&) = delete; - - void Run() override { functor_(); } - - private: - ~MessageWithFunctor() override {} - - typename std::remove_reference::type functor_; -}; - -} // namespace rtc_thread_internal - class RTC_EXPORT ThreadManager { public: static const int kForever = -1; @@ -418,36 +394,29 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { bool IsInvokeToThreadAllowed(rtc::Thread* target); // From TaskQueueBase + void Delete() override; + void PostTask(absl::AnyInvocable task) override; + void PostDelayedTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) override; + void PostDelayedHighPrecisionTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) override; + + // Legacy TaskQueueBase methods, do not use in new code. + // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread + // directly is updated to use PostTask methods above. void PostTask(std::unique_ptr task) override; void PostDelayedTask(std::unique_ptr task, uint32_t milliseconds) override; void PostDelayedHighPrecisionTask(std::unique_ptr task, uint32_t milliseconds) override; - void Delete() override; - // Helper methods to avoid having to do ToQueuedTask() at the calling places. - template >::value>::type* = nullptr> - void PostTask(Closure&& closure) { - PostTask(webrtc::ToQueuedTask(std::forward(closure))); - } - template >::value>::type* = nullptr> - void PostDelayedTask(Closure&& closure, uint32_t milliseconds) { - PostDelayedTask(webrtc::ToQueuedTask(std::forward(closure)), - milliseconds); - } - template >::value>::type* = nullptr> - void PostDelayedHighPrecisionTask(Closure&& closure, uint32_t milliseconds) { - PostDelayedHighPrecisionTask( - webrtc::ToQueuedTask(std::forward(closure)), milliseconds); + // Legacy helper method, do not use in new code. + // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread + // directly is updated to use PostTask methods above. + ABSL_DEPRECATED("Pass delay as webrtc::TimeDelta type") + void PostDelayedTask(absl::AnyInvocable task, + uint32_t milliseconds) { + PostDelayedTask(std::move(task), webrtc::TimeDelta::Millis(milliseconds)); } // ProcessMessages will process I/O and dispatch messages until: @@ -609,10 +578,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Called by the ThreadManager when being unset as the current thread. void ClearCurrentTaskQueue(); - // Returns a static-lifetime MessageHandler which runs message with - // MessageLikeTask payload data. - static MessageHandler* GetPostTaskMessageHandler(); - bool fPeekKeep_; Message msgPeek_; MessageList messages_ RTC_GUARDED_BY(crit_); diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index 321cbc3a22..7fcf7ca833 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -14,7 +14,7 @@ #include "api/task_queue/task_queue_factory.h" #include "api/task_queue/task_queue_test.h" -#include "api/task_queue/to_queued_task.h" +#include "api/units/time_delta.h" #include "rtc_base/async_invoker.h" #include "rtc_base/async_udp_socket.h" #include "rtc_base/checks.h" @@ -36,7 +36,7 @@ namespace rtc { namespace { -using ::webrtc::ToQueuedTask; +using ::webrtc::TimeDelta; // Generates a sequence of numbers (collaboratively). class TestGenerator { @@ -373,8 +373,8 @@ TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) { auto thread1 = Thread::CreateWithSocketServer(); auto thread2 = Thread::CreateWithSocketServer(); - thread1->PostTask(ToQueuedTask( - [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + thread1->PostTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }); main_thread.ProcessMessages(100); } @@ -389,11 +389,11 @@ TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) { thread1->AllowInvokesToThread(thread2.get()); thread1->AllowInvokesToThread(thread3.get()); - thread1->PostTask(ToQueuedTask([&]() { + thread1->PostTask([&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get())); EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get())); - })); + }); main_thread.ProcessMessages(100); } @@ -405,9 +405,8 @@ TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) { thread1->DisallowAllInvokes(); - thread1->PostTask(ToQueuedTask([&]() { - EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); - })); + thread1->PostTask( + [&]() { EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); }); main_thread.ProcessMessages(100); } #endif // (!defined(NDEBUG) || RTC_DCHECK_IS_ON) @@ -418,8 +417,8 @@ TEST(ThreadTest, InvokesAllowedByDefault) { auto thread1 = Thread::CreateWithSocketServer(); auto thread2 = Thread::CreateWithSocketServer(); - thread1->PostTask(ToQueuedTask( - [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); + thread1->PostTask( + [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); }); main_thread.ProcessMessages(100); } @@ -672,11 +671,11 @@ TEST(ThreadManager, ProcessAllMessageQueues) { }; // Post messages (both delayed and non delayed) to both threads. - a->PostTask(ToQueuedTask(incrementer)); - b->PostTask(ToQueuedTask(incrementer)); - a->PostDelayedTask(ToQueuedTask(incrementer), 0); - b->PostDelayedTask(ToQueuedTask(incrementer), 0); - main_thread.PostTask(ToQueuedTask(event_signaler)); + a->PostTask(incrementer); + b->PostTask(incrementer); + a->PostDelayedTask(incrementer, TimeDelta::Zero()); + b->PostDelayedTask(incrementer, TimeDelta::Zero()); + main_thread.PostTask(event_signaler); ThreadManager::ProcessAllMessageQueuesForTesting(); EXPECT_EQ(4, messages_processed.load(std::memory_order_acquire)); @@ -1083,7 +1082,7 @@ TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) { WaitAndSetEvent(&event_set_by_test_thread, &event_set_by_background_thread); }, - /*milliseconds=*/10); + TimeDelta::Millis(10)); event_set_by_test_thread.Set(); event_set_by_background_thread.Wait(Event::kForever); } @@ -1100,18 +1099,18 @@ TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) { background_thread->PostDelayedTask( [&third, &fourth] { WaitAndSetEvent(&third, &fourth); }, - /*milliseconds=*/11); + TimeDelta::Millis(11)); background_thread->PostDelayedTask( [&first, &second] { WaitAndSetEvent(&first, &second); }, - /*milliseconds=*/9); + TimeDelta::Millis(9)); background_thread->PostDelayedTask( [&second, &third] { WaitAndSetEvent(&second, &third); }, - /*milliseconds=*/10); + TimeDelta::Millis(10)); // All tasks have been posted before the first one is unblocked. first.Set(); // Only if the chain is invoked in delay order will the last event be set. - clock.AdvanceTime(webrtc::TimeDelta::Millis(11)); + clock.AdvanceTime(TimeDelta::Millis(11)); EXPECT_TRUE(fourth.Wait(0)); }