In rtc::Thread implement posting AnyInvocable

Lots of code call rtc::Thread directly instead of through TaskQueueBase
interface, thus to continue migration step by step rtc::Thread needs
to implement both old and new TaskQueueBase interfaces.

Bug: webrtc:14245
Change-Id: Ie7cac897a4c8a6227b8d467a39adb30aec6f1318
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/267984
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37474}
This commit is contained in:
Danil Chapovalov
2022-07-06 19:42:34 +02:00
committed by WebRTC LUCI CQ
parent 791294a647
commit 4bcf809df7
4 changed files with 93 additions and 107 deletions

View File

@ -914,6 +914,8 @@ rtc_library("threading") {
absl_deps = [ absl_deps = [
"//third_party/abseil-cpp/absl/algorithm:container", "//third_party/abseil-cpp/absl/algorithm:container",
"//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
] ]
deps = [ deps = [
@ -943,6 +945,7 @@ rtc_library("threading") {
"../api/task_queue", "../api/task_queue",
"../api/task_queue:pending_task_safety_flag", "../api/task_queue:pending_task_safety_flag",
"../api/task_queue:to_queued_task", "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"synchronization:mutex", "synchronization:mutex",
"system:no_unique_address", "system:no_unique_address",
"system:rtc_export", "system:rtc_export",
@ -1730,6 +1733,7 @@ if (rtc_include_tests) {
"../api/task_queue:pending_task_safety_flag", "../api/task_queue:pending_task_safety_flag",
"../api/task_queue:task_queue_test", "../api/task_queue:task_queue_test",
"../api/task_queue:to_queued_task", "../api/task_queue:to_queued_task",
"../api/units:time_delta",
"../test:field_trial", "../test:field_trial",
"../test:fileutils", "../test:fileutils",
"../test:rtc_expect_death", "../test:rtc_expect_death",

View File

@ -31,8 +31,8 @@
#include <utility> #include <utility>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/cleanup/cleanup.h"
#include "api/sequence_checker.h" #include "api/sequence_checker.h"
#include "api/task_queue/to_queued_task.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/event.h" #include "rtc_base/event.h"
@ -72,22 +72,25 @@ class ScopedAutoReleasePool {
namespace rtc { namespace rtc {
namespace { namespace {
class MessageHandlerWithTask final : public MessageHandler { struct AnyInvocableMessage final : public MessageData {
explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
: task(std::move(task)) {}
absl::AnyInvocable<void() &&> task;
};
class AnyInvocableMessageHandler final : public MessageHandler {
public: public:
MessageHandlerWithTask() {}
MessageHandlerWithTask(const MessageHandlerWithTask&) = delete;
MessageHandlerWithTask& operator=(const MessageHandlerWithTask&) = delete;
void OnMessage(Message* msg) override { void OnMessage(Message* msg) override {
static_cast<rtc_thread_internal::MessageLikeTask*>(msg->pdata)->Run(); std::move(static_cast<AnyInvocableMessage*>(msg->pdata)->task)();
delete msg->pdata; delete msg->pdata;
} }
private:
~MessageHandlerWithTask() override {}
}; };
MessageHandler* GetAnyInvocableMessageHandler() {
static MessageHandler* const handler = new AnyInvocableMessageHandler;
return handler;
}
class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { class RTC_SCOPED_LOCKABLE MarkProcessingCritScope {
public: public:
MarkProcessingCritScope(const RecursiveCriticalSection* cs, MarkProcessingCritScope(const RecursiveCriticalSection* cs,
@ -761,8 +764,7 @@ bool Thread::SetName(absl::string_view name, const void* obj) {
void Thread::SetDispatchWarningMs(int deadline) { void Thread::SetDispatchWarningMs(int deadline) {
if (!IsCurrent()) { if (!IsCurrent()) {
PostTask(webrtc::ToQueuedTask( PostTask([this, deadline]() { SetDispatchWarningMs(deadline); });
[this, deadline]() { SetDispatchWarningMs(deadline); }));
return; return;
} }
RTC_DCHECK_RUN_ON(this); RTC_DCHECK_RUN_ON(this);
@ -948,18 +950,19 @@ void Thread::Send(const Location& posted_from,
done_event.reset(new rtc::Event()); done_event.reset(new rtc::Event());
bool ready = false; bool ready = false;
PostTask(webrtc::ToQueuedTask( absl::Cleanup cleanup = [this, &ready, current_thread,
[&msg]() mutable { msg.phandler->OnMessage(&msg); }, done = done_event.get()] {
[this, &ready, current_thread, done = done_event.get()] { if (current_thread) {
if (current_thread) { CritScope cs(&crit_);
CritScope cs(&crit_); ready = true;
ready = true; current_thread->socketserver()->WakeUp();
current_thread->socketserver()->WakeUp(); } else {
} else { done->Set();
done->Set(); }
} };
})); PostTask([&msg, cleanup = std::move(cleanup)]() mutable {
msg.phandler->OnMessage(&msg);
});
if (current_thread) { if (current_thread) {
bool waited = false; bool waited = false;
crit_.Enter(); crit_.Enter();
@ -1115,6 +1118,28 @@ void Thread::Delete() {
delete this; delete this;
} }
void Thread::PostTask(absl::AnyInvocable<void() &&> task) {
// Though Post takes MessageData by raw pointer (last parameter), it still
// takes it with ownership.
Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(),
/*id=*/0, new AnyInvocableMessage(std::move(task)));
}
void Thread::PostDelayedTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) {
// This implementation does not support low precision yet.
PostDelayedHighPrecisionTask(std::move(task), delay);
}
void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) {
int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms<int>();
// Though PostDelayed takes MessageData by raw pointer (last parameter),
// it still takes it with ownership.
PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(),
/*id=*/0, new AnyInvocableMessage(std::move(task)));
}
bool Thread::IsProcessingMessagesForTesting() { bool Thread::IsProcessingMessagesForTesting() {
return (owned_ || IsCurrent()) && !IsQuitting(); return (owned_ || IsCurrent()) && !IsQuitting();
} }
@ -1183,13 +1208,6 @@ bool Thread::IsRunning() {
#endif #endif
} }
// static
MessageHandler* Thread::GetPostTaskMessageHandler() {
// Allocate at first call, never deallocate.
static MessageHandler* handler = new MessageHandlerWithTask;
return handler;
}
AutoThread::AutoThread() AutoThread::AutoThread()
: Thread(CreateDefaultSocketServer(), /*do_init=*/false) { : Thread(CreateDefaultSocketServer(), /*do_init=*/false) {
if (!ThreadManager::Instance()->CurrentThread()) { if (!ThreadManager::Instance()->CurrentThread()) {

View File

@ -28,10 +28,12 @@
#include <pthread.h> #include <pthread.h>
#endif #endif
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/functional/any_invocable.h"
#include "api/function_view.h" #include "api/function_view.h"
#include "api/task_queue/queued_task.h" #include "api/task_queue/queued_task.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/task_queue/to_queued_task.h" #include "api/task_queue/to_queued_task.h"
#include "api/units/time_delta.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
#include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/deprecated/recursive_critical_section.h"
#include "rtc_base/location.h" #include "rtc_base/location.h"
@ -79,32 +81,6 @@ namespace rtc {
class Thread; class Thread;
namespace rtc_thread_internal {
class MessageLikeTask : public MessageData {
public:
virtual void Run() = 0;
};
template <class FunctorT>
class MessageWithFunctor final : public MessageLikeTask {
public:
explicit MessageWithFunctor(FunctorT&& functor)
: functor_(std::forward<FunctorT>(functor)) {}
MessageWithFunctor(const MessageWithFunctor&) = delete;
MessageWithFunctor& operator=(const MessageWithFunctor&) = delete;
void Run() override { functor_(); }
private:
~MessageWithFunctor() override {}
typename std::remove_reference<FunctorT>::type functor_;
};
} // namespace rtc_thread_internal
class RTC_EXPORT ThreadManager { class RTC_EXPORT ThreadManager {
public: public:
static const int kForever = -1; static const int kForever = -1;
@ -418,36 +394,29 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
bool IsInvokeToThreadAllowed(rtc::Thread* target); bool IsInvokeToThreadAllowed(rtc::Thread* target);
// From TaskQueueBase // From TaskQueueBase
void Delete() override;
void PostTask(absl::AnyInvocable<void() &&> task) override;
void PostDelayedTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) override;
void PostDelayedHighPrecisionTask(absl::AnyInvocable<void() &&> task,
webrtc::TimeDelta delay) override;
// Legacy TaskQueueBase methods, do not use in new code.
// TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
// directly is updated to use PostTask methods above.
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override; void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override;
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task, void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override; uint32_t milliseconds) override;
void PostDelayedHighPrecisionTask(std::unique_ptr<webrtc::QueuedTask> task, void PostDelayedHighPrecisionTask(std::unique_ptr<webrtc::QueuedTask> task,
uint32_t milliseconds) override; uint32_t milliseconds) override;
void Delete() override;
// Helper methods to avoid having to do ToQueuedTask() at the calling places. // Legacy helper method, do not use in new code.
template <class Closure, // TODO(bugs.webrtc.org/14245): Delete when all code that use rtc::Thread
typename std::enable_if<!std::is_convertible< // directly is updated to use PostTask methods above.
Closure, ABSL_DEPRECATED("Pass delay as webrtc::TimeDelta type")
std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr> void PostDelayedTask(absl::AnyInvocable<void() &&> task,
void PostTask(Closure&& closure) { uint32_t milliseconds) {
PostTask(webrtc::ToQueuedTask(std::forward<Closure>(closure))); PostDelayedTask(std::move(task), webrtc::TimeDelta::Millis(milliseconds));
}
template <class Closure,
typename std::enable_if<!std::is_convertible<
Closure,
std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
void PostDelayedTask(Closure&& closure, uint32_t milliseconds) {
PostDelayedTask(webrtc::ToQueuedTask(std::forward<Closure>(closure)),
milliseconds);
}
template <class Closure,
typename std::enable_if<!std::is_convertible<
Closure,
std::unique_ptr<webrtc::QueuedTask>>::value>::type* = nullptr>
void PostDelayedHighPrecisionTask(Closure&& closure, uint32_t milliseconds) {
PostDelayedHighPrecisionTask(
webrtc::ToQueuedTask(std::forward<Closure>(closure)), milliseconds);
} }
// ProcessMessages will process I/O and dispatch messages until: // ProcessMessages will process I/O and dispatch messages until:
@ -609,10 +578,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
// Called by the ThreadManager when being unset as the current thread. // Called by the ThreadManager when being unset as the current thread.
void ClearCurrentTaskQueue(); void ClearCurrentTaskQueue();
// Returns a static-lifetime MessageHandler which runs message with
// MessageLikeTask payload data.
static MessageHandler* GetPostTaskMessageHandler();
bool fPeekKeep_; bool fPeekKeep_;
Message msgPeek_; Message msgPeek_;
MessageList messages_ RTC_GUARDED_BY(crit_); MessageList messages_ RTC_GUARDED_BY(crit_);

View File

@ -14,7 +14,7 @@
#include "api/task_queue/task_queue_factory.h" #include "api/task_queue/task_queue_factory.h"
#include "api/task_queue/task_queue_test.h" #include "api/task_queue/task_queue_test.h"
#include "api/task_queue/to_queued_task.h" #include "api/units/time_delta.h"
#include "rtc_base/async_invoker.h" #include "rtc_base/async_invoker.h"
#include "rtc_base/async_udp_socket.h" #include "rtc_base/async_udp_socket.h"
#include "rtc_base/checks.h" #include "rtc_base/checks.h"
@ -36,7 +36,7 @@
namespace rtc { namespace rtc {
namespace { namespace {
using ::webrtc::ToQueuedTask; using ::webrtc::TimeDelta;
// Generates a sequence of numbers (collaboratively). // Generates a sequence of numbers (collaboratively).
class TestGenerator { class TestGenerator {
@ -373,8 +373,8 @@ TEST(ThreadTest, InvokeToThreadAllowedReturnsTrueWithoutPolicies) {
auto thread1 = Thread::CreateWithSocketServer(); auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer(); auto thread2 = Thread::CreateWithSocketServer();
thread1->PostTask(ToQueuedTask( thread1->PostTask(
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
main_thread.ProcessMessages(100); main_thread.ProcessMessages(100);
} }
@ -389,11 +389,11 @@ TEST(ThreadTest, InvokeAllowedWhenThreadsAdded) {
thread1->AllowInvokesToThread(thread2.get()); thread1->AllowInvokesToThread(thread2.get());
thread1->AllowInvokesToThread(thread3.get()); thread1->AllowInvokesToThread(thread3.get());
thread1->PostTask(ToQueuedTask([&]() { thread1->PostTask([&]() {
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get()));
EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get())); EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread3.get()));
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get())); EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread4.get()));
})); });
main_thread.ProcessMessages(100); main_thread.ProcessMessages(100);
} }
@ -405,9 +405,8 @@ TEST(ThreadTest, InvokesDisallowedWhenDisallowAllInvokes) {
thread1->DisallowAllInvokes(); thread1->DisallowAllInvokes();
thread1->PostTask(ToQueuedTask([&]() { thread1->PostTask(
EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); [&]() { EXPECT_FALSE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
}));
main_thread.ProcessMessages(100); main_thread.ProcessMessages(100);
} }
#endif // (!defined(NDEBUG) || RTC_DCHECK_IS_ON) #endif // (!defined(NDEBUG) || RTC_DCHECK_IS_ON)
@ -418,8 +417,8 @@ TEST(ThreadTest, InvokesAllowedByDefault) {
auto thread1 = Thread::CreateWithSocketServer(); auto thread1 = Thread::CreateWithSocketServer();
auto thread2 = Thread::CreateWithSocketServer(); auto thread2 = Thread::CreateWithSocketServer();
thread1->PostTask(ToQueuedTask( thread1->PostTask(
[&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); })); [&]() { EXPECT_TRUE(thread1->IsInvokeToThreadAllowed(thread2.get())); });
main_thread.ProcessMessages(100); main_thread.ProcessMessages(100);
} }
@ -672,11 +671,11 @@ TEST(ThreadManager, ProcessAllMessageQueues) {
}; };
// Post messages (both delayed and non delayed) to both threads. // Post messages (both delayed and non delayed) to both threads.
a->PostTask(ToQueuedTask(incrementer)); a->PostTask(incrementer);
b->PostTask(ToQueuedTask(incrementer)); b->PostTask(incrementer);
a->PostDelayedTask(ToQueuedTask(incrementer), 0); a->PostDelayedTask(incrementer, TimeDelta::Zero());
b->PostDelayedTask(ToQueuedTask(incrementer), 0); b->PostDelayedTask(incrementer, TimeDelta::Zero());
main_thread.PostTask(ToQueuedTask(event_signaler)); main_thread.PostTask(event_signaler);
ThreadManager::ProcessAllMessageQueuesForTesting(); ThreadManager::ProcessAllMessageQueuesForTesting();
EXPECT_EQ(4, messages_processed.load(std::memory_order_acquire)); EXPECT_EQ(4, messages_processed.load(std::memory_order_acquire));
@ -1083,7 +1082,7 @@ TEST(ThreadPostDelayedTaskTest, InvokesAsynchronously) {
WaitAndSetEvent(&event_set_by_test_thread, WaitAndSetEvent(&event_set_by_test_thread,
&event_set_by_background_thread); &event_set_by_background_thread);
}, },
/*milliseconds=*/10); TimeDelta::Millis(10));
event_set_by_test_thread.Set(); event_set_by_test_thread.Set();
event_set_by_background_thread.Wait(Event::kForever); event_set_by_background_thread.Wait(Event::kForever);
} }
@ -1100,18 +1099,18 @@ TEST(ThreadPostDelayedTaskTest, InvokesInDelayOrder) {
background_thread->PostDelayedTask( background_thread->PostDelayedTask(
[&third, &fourth] { WaitAndSetEvent(&third, &fourth); }, [&third, &fourth] { WaitAndSetEvent(&third, &fourth); },
/*milliseconds=*/11); TimeDelta::Millis(11));
background_thread->PostDelayedTask( background_thread->PostDelayedTask(
[&first, &second] { WaitAndSetEvent(&first, &second); }, [&first, &second] { WaitAndSetEvent(&first, &second); },
/*milliseconds=*/9); TimeDelta::Millis(9));
background_thread->PostDelayedTask( background_thread->PostDelayedTask(
[&second, &third] { WaitAndSetEvent(&second, &third); }, [&second, &third] { WaitAndSetEvent(&second, &third); },
/*milliseconds=*/10); TimeDelta::Millis(10));
// All tasks have been posted before the first one is unblocked. // All tasks have been posted before the first one is unblocked.
first.Set(); first.Set();
// Only if the chain is invoked in delay order will the last event be set. // Only if the chain is invoked in delay order will the last event be set.
clock.AdvanceTime(webrtc::TimeDelta::Millis(11)); clock.AdvanceTime(TimeDelta::Millis(11));
EXPECT_TRUE(fourth.Wait(0)); EXPECT_TRUE(fourth.Wait(0));
} }