From d44e3410b60d63696f303dc498b9ad2086770041 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Fri, 16 Sep 2022 17:26:10 +0200 Subject: [PATCH] Delete rtc::Thread functions that use rtc::MessageHandler Bug: webrtc:9702 Change-Id: I6fc8aa8a793caf19d62a149db1861c352c609255 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/275774 Commit-Queue: Danil Chapovalov Reviewed-by: Tomas Gunnarsson Cr-Commit-Position: refs/heads/main@{#38150} --- rtc_base/thread.cc | 179 ++++------------------- rtc_base/thread.h | 86 +++-------- rtc_base/thread_unittest.cc | 26 ---- test/time_controller/simulated_thread.cc | 32 ++-- test/time_controller/simulated_thread.h | 20 +-- 5 files changed, 67 insertions(+), 276 deletions(-) diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 7525096f91..fbf4105a8a 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -76,25 +76,6 @@ namespace { using ::webrtc::TimeDelta; -struct AnyInvocableMessage final : public MessageData { - explicit AnyInvocableMessage(absl::AnyInvocable task) - : task(std::move(task)) {} - absl::AnyInvocable task; -}; - -class AnyInvocableMessageHandler final : public MessageHandler { - public: - void OnMessage(Message* msg) override { - std::move(static_cast(msg->pdata)->task)(); - delete msg->pdata; - } -}; - -MessageHandler* GetAnyInvocableMessageHandler() { - static MessageHandler* const handler = new AnyInvocableMessageHandler; - return handler; -} - class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { public: MarkProcessingCritScope(const RecursiveCriticalSection* cs, @@ -407,7 +388,9 @@ void Thread::DoDestroy() { ss_->SetMessageQueue(nullptr); } ThreadManager::Remove(this); - ClearInternal(nullptr, MQID_ANY, nullptr); + // Clear. + messages_ = {}; + delayed_messages_ = {}; } SocketServer* Thread::socketserver() { @@ -431,7 +414,7 @@ void Thread::Restart() { stop_.store(0, std::memory_order_release); } -bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { +absl::AnyInvocable Thread::Get(int cmsWait) { // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch int64_t cmsTotal = cmsWait; @@ -448,19 +431,19 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { // Check for delayed messages that have been triggered and calculate the // next trigger time. while (!delayed_messages_.empty()) { - if (msCurrent < delayed_messages_.top().run_time_ms_) { + if (msCurrent < delayed_messages_.top().run_time_ms) { cmsDelayNext = - TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent); + TimeDiff(delayed_messages_.top().run_time_ms, msCurrent); break; } - messages_.push_back(delayed_messages_.top().msg_); + messages_.push(std::move(delayed_messages_.top().functor)); delayed_messages_.pop(); } // Pull a message off the message queue, if available. if (!messages_.empty()) { - *pmsg = messages_.front(); - messages_.pop_front(); - return true; + absl::AnyInvocable task = std::move(messages_.front()); + messages_.pop(); + return task; } } @@ -482,8 +465,8 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { // Wait and multiplex in the meantime if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever : webrtc::TimeDelta::Millis(cmsNext), - process_io)) - return false; + /*process_io=*/true)) + return nullptr; } // If the specified timeout expired, return @@ -492,20 +475,14 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { cmsElapsed = TimeDiff(msCurrent, msStart); if (cmsWait != kForever) { if (cmsElapsed >= cmsWait) - return false; + return nullptr; } } - return false; + return nullptr; } -void Thread::Post(const Location& posted_from, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata, - bool time_sensitive) { - RTC_DCHECK(!time_sensitive); +void Thread::PostTask(absl::AnyInvocable task) { if (IsQuitting()) { - delete pdata; return; } @@ -515,42 +492,14 @@ void Thread::Post(const Location& posted_from, { CritScope cs(&crit_); - Message msg; - msg.posted_from = posted_from; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - messages_.push_back(msg); + messages_.push(std::move(task)); } WakeUpSocketServer(); } -void Thread::PostDelayed(const Location& posted_from, - int delay_ms, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - return DoDelayPost(posted_from, delay_ms, TimeAfter(delay_ms), phandler, id, - pdata); -} - -void Thread::PostAt(const Location& posted_from, - int64_t run_at_ms, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - return DoDelayPost(posted_from, TimeUntil(run_at_ms), run_at_ms, phandler, id, - pdata); -} - -void Thread::DoDelayPost(const Location& posted_from, - int64_t delay_ms, - int64_t run_at_ms, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { +void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable task, + webrtc::TimeDelta delay) { if (IsQuitting()) { - delete pdata; return; } @@ -558,15 +507,14 @@ void Thread::DoDelayPost(const Location& posted_from, // Add to the priority queue. Gets sorted soonest first. // Signal for the multiplexer to return. + int64_t delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms(); + int64_t run_time_ms = TimeAfter(delay_ms); { CritScope cs(&crit_); - Message msg; - msg.posted_from = posted_from; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage delayed(delay_ms, run_at_ms, delayed_next_num_, msg); - delayed_messages_.push(delayed); + delayed_messages_.push({.delay_ms = delay_ms, + .run_time_ms = run_time_ms, + .message_number = delayed_next_num_, + .functor = std::move(task)}); // If this message queue processes 1 message every millisecond for 50 days, // we will wrap this number. Even then, only messages with identical times // will be misordered, and then only briefly. This is probably ok. @@ -583,7 +531,7 @@ int Thread::GetDelay() { return 0; if (!delayed_messages_.empty()) { - int delay = TimeUntil(delayed_messages_.top().run_time_ms_); + int delay = TimeUntil(delayed_messages_.top().run_time_ms); if (delay < 0) delay = 0; return delay; @@ -592,56 +540,16 @@ int Thread::GetDelay() { return kForever; } -void Thread::ClearInternal(MessageHandler* phandler, - uint32_t id, - MessageList* removed) { - // Remove from ordered message queue - - for (auto it = messages_.begin(); it != messages_.end();) { - if (it->Match(phandler, id)) { - if (removed) { - removed->push_back(*it); - } else { - delete it->pdata; - } - it = messages_.erase(it); - } else { - ++it; - } - } - - // Remove from priority queue. Not directly iterable, so use this approach - - auto new_end = delayed_messages_.container().begin(); - for (auto it = new_end; it != delayed_messages_.container().end(); ++it) { - if (it->msg_.Match(phandler, id)) { - if (removed) { - removed->push_back(it->msg_); - } else { - delete it->msg_.pdata; - } - } else { - *new_end++ = *it; - } - } - delayed_messages_.container().erase(new_end, - delayed_messages_.container().end()); - delayed_messages_.reheap(); -} - -void Thread::Dispatch(Message* pmsg) { - TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file", - pmsg->posted_from.file_name(), "src_func", - pmsg->posted_from.function_name()); +void Thread::Dispatch(absl::AnyInvocable task) { + TRACE_EVENT0("webrtc", "Thread::Dispatch"); RTC_DCHECK_RUN_ON(this); int64_t start_time = TimeMillis(); - pmsg->phandler->OnMessage(pmsg); + std::move(task)(); int64_t end_time = TimeMillis(); int64_t diff = TimeDiff(end_time, start_time); if (diff >= dispatch_warning_ms_) { RTC_LOG(LS_INFO) << "Message to " << name() << " took " << diff - << "ms to dispatch. Posted from: " - << pmsg->posted_from.ToString(); + << "ms to dispatch."; // To avoid log spew, move the warning limit to only give warning // for delays that are larger than the one observed. dispatch_warning_ms_ = diff + 1; @@ -986,39 +894,16 @@ void Thread::Delete() { delete this; } -void Thread::PostTask(absl::AnyInvocable task) { - // Though Post takes MessageData by raw pointer (last parameter), it still - // takes it with ownership. - Post(RTC_FROM_HERE, GetAnyInvocableMessageHandler(), - /*id=*/0, new AnyInvocableMessage(std::move(task))); -} - void Thread::PostDelayedTask(absl::AnyInvocable task, webrtc::TimeDelta delay) { // This implementation does not support low precision yet. PostDelayedHighPrecisionTask(std::move(task), delay); } -void Thread::PostDelayedHighPrecisionTask(absl::AnyInvocable task, - webrtc::TimeDelta delay) { - int delay_ms = delay.RoundUpTo(webrtc::TimeDelta::Millis(1)).ms(); - // Though PostDelayed takes MessageData by raw pointer (last parameter), - // it still takes it with ownership. - PostDelayed(RTC_FROM_HERE, delay_ms, GetAnyInvocableMessageHandler(), - /*id=*/0, new AnyInvocableMessage(std::move(task))); -} - bool Thread::IsProcessingMessagesForTesting() { return (owned_ || IsCurrent()) && !IsQuitting(); } -void Thread::Clear(MessageHandler* phandler, - uint32_t id, - MessageList* removed) { - CritScope cs(&crit_); - ClearInternal(phandler, id, removed); -} - bool Thread::ProcessMessages(int cmsLoop) { // Using ProcessMessages with a custom clock for testing and a time greater // than 0 doesn't work, since it's not guaranteed to advance the custom @@ -1032,10 +917,10 @@ bool Thread::ProcessMessages(int cmsLoop) { #if defined(WEBRTC_MAC) ScopedAutoReleasePool pool; #endif - Message msg; - if (!Get(&msg, cmsNext)) + absl::AnyInvocable task = Get(cmsNext); + if (!task) return !IsQuitting(); - Dispatch(&msg); + Dispatch(std::move(task)); if (cmsLoop != kForever) { cmsNext = static_cast(TimeUntil(msEnd)); diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 1519976f87..3fa75330a5 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -36,12 +36,10 @@ #include "rtc_base/checks.h" #include "rtc_base/deprecated/recursive_critical_section.h" #include "rtc_base/location.h" -#include "rtc_base/message_handler.h" #include "rtc_base/platform_thread_types.h" #include "rtc_base/socket_server.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/thread_annotations.h" -#include "rtc_base/thread_message.h" #if defined(WEBRTC_WIN) #include "rtc_base/win32.h" @@ -267,26 +265,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Processed. Normally, this would be true until IsQuitting() is true. virtual bool IsProcessingMessagesForTesting(); - // `time_sensitive` is deprecated and should always be false. - virtual void Post(const Location& posted_from, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr, - bool time_sensitive = false); - virtual void PostDelayed(const Location& posted_from, - int delay_ms, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - virtual void PostAt(const Location& posted_from, - int64_t run_at_ms, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - virtual void Clear(MessageHandler* phandler, - uint32_t id = MQID_ANY, - MessageList* removed = nullptr); - // Amount of time until the next message can be retrieved virtual int GetDelay(); @@ -427,54 +405,28 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // DelayedMessage goes into a priority queue, sorted by trigger time. Messages // with the same trigger time are processed in num_ (FIFO) order. - class DelayedMessage { - public: - DelayedMessage(int64_t delay, - int64_t run_time_ms, - uint32_t num, - const Message& msg) - : delay_ms_(delay), - run_time_ms_(run_time_ms), - message_number_(num), - msg_(msg) {} - + struct DelayedMessage { bool operator<(const DelayedMessage& dmsg) const { - return (dmsg.run_time_ms_ < run_time_ms_) || - ((dmsg.run_time_ms_ == run_time_ms_) && - (dmsg.message_number_ < message_number_)); + return (dmsg.run_time_ms < run_time_ms) || + ((dmsg.run_time_ms == run_time_ms) && + (dmsg.message_number < message_number)); } - int64_t delay_ms_; // for debugging - int64_t run_time_ms_; + int64_t delay_ms; // for debugging + int64_t run_time_ms; // Monotonicaly incrementing number used for ordering of messages // targeted to execute at the same time. - uint32_t message_number_; - Message msg_; + uint32_t message_number; + // std::priority_queue doesn't allow to extract elements, but functor + // is move-only and thus need to be changed when pulled out of the + // priority queue. That is ok because `functor` doesn't affect operator< + mutable absl::AnyInvocable functor; }; - class PriorityQueue : public std::priority_queue { - public: - container_type& container() { return c; } - void reheap() { make_heap(c.begin(), c.end(), comp); } - }; - - void DoDelayPost(const Location& posted_from, - int64_t cmsDelay, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata); - // Perform initialization, subclasses must call this from their constructor // if false was passed as init_queue to the Thread constructor. void DoInit(); - // Does not take any lock. Must be called either while holding crit_, or by - // the destructor (by definition, the latter has exclusive access). - void ClearInternal(MessageHandler* phandler, - uint32_t id, - MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); - // Perform cleanup; subclasses must call this from the destructor, // and are not expected to actually hold the lock. void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); @@ -497,13 +449,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { static const int kSlowDispatchLoggingThreshold = 50; // 50 ms // Get() will process I/O until: - // 1) A message is available (returns true) - // 2) cmsWait seconds have elapsed (returns false) - // 3) Stop() is called (returns false) - virtual bool Get(Message* pmsg, - int cmsWait = kForever, - bool process_io = true); - virtual void Dispatch(Message* pmsg); + // 1) A task is available (returns it) + // 2) cmsWait seconds have elapsed (returns empty task) + // 3) Stop() is called (returns empty task) + absl::AnyInvocable Get(int cmsWait); + void Dispatch(absl::AnyInvocable task); // Sets the per-thread allow-blocking-calls flag and returns the previous // value. Must be called on this thread. @@ -532,8 +482,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Called by the ThreadManager when being unset as the current thread. void ClearCurrentTaskQueue(); - MessageList messages_ RTC_GUARDED_BY(crit_); - PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); + std::queue> messages_ RTC_GUARDED_BY(crit_); + std::priority_queue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); #if RTC_DCHECK_IS_ON uint32_t blocking_call_count_ RTC_GUARDED_BY(this) = 0; diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index c0e93bc622..d5b467c1f2 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -553,32 +553,6 @@ TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) { ThreadManager::ProcessAllMessageQueuesForTesting(); } -// Test that ProcessAllMessageQueues doesn't hang if a queue clears its -// messages. -TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) { - rtc::AutoThread main_thread; - Event entered_process_all_message_queues(true, false); - auto t = Thread::CreateWithSocketServer(); - t->Start(); - - auto clearer = [&entered_process_all_message_queues] { - // Wait for event as a means to ensure Clear doesn't occur outside of - // ProcessAllMessageQueues. The event is set by a message posted to the - // main thread, which is guaranteed to be handled inside - // ProcessAllMessageQueues. - entered_process_all_message_queues.Wait(Event::kForever); - rtc::Thread::Current()->Clear(nullptr); - }; - auto event_signaler = [&entered_process_all_message_queues] { - entered_process_all_message_queues.Set(); - }; - - // Post messages (both delayed and non delayed) to both threads. - t->PostTask(clearer); - main_thread.PostTask(event_signaler); - ThreadManager::ProcessAllMessageQueuesForTesting(); -} - void WaitAndSetEvent(Event* wait_event, Event* set_event) { wait_event->Wait(Event::kForever); set_event->Set(); diff --git a/test/time_controller/simulated_thread.cc b/test/time_controller/simulated_thread.cc index 266b76fb54..bdd1096327 100644 --- a/test/time_controller/simulated_thread.cc +++ b/test/time_controller/simulated_thread.cc @@ -77,35 +77,27 @@ void SimulatedThread::BlockingCall(rtc::FunctionView functor) { } } -void SimulatedThread::Post(const rtc::Location& posted_from, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata, - bool time_sensitive) { - rtc::Thread::Post(posted_from, phandler, id, pdata, time_sensitive); +void SimulatedThread::PostTask(absl::AnyInvocable task) { + rtc::Thread::PostTask(std::move(task)); MutexLock lock(&lock_); next_run_time_ = Timestamp::MinusInfinity(); } -void SimulatedThread::PostDelayed(const rtc::Location& posted_from, - int delay_ms, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata) { - rtc::Thread::PostDelayed(posted_from, delay_ms, phandler, id, pdata); +void SimulatedThread::PostDelayedTask(absl::AnyInvocable task, + TimeDelta delay) { + rtc::Thread::PostDelayedTask(std::move(task), delay); MutexLock lock(&lock_); next_run_time_ = - std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis() + delay_ms)); + std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay); } -void SimulatedThread::PostAt(const rtc::Location& posted_from, - int64_t target_time_ms, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata) { - rtc::Thread::PostAt(posted_from, target_time_ms, phandler, id, pdata); +void SimulatedThread::PostDelayedHighPrecisionTask( + absl::AnyInvocable task, + TimeDelta delay) { + rtc::Thread::PostDelayedHighPrecisionTask(std::move(task), delay); MutexLock lock(&lock_); - next_run_time_ = std::min(next_run_time_, Timestamp::Millis(target_time_ms)); + next_run_time_ = + std::min(next_run_time_, Timestamp::Millis(rtc::TimeMillis()) + delay); } void SimulatedThread::Stop() { diff --git a/test/time_controller/simulated_thread.h b/test/time_controller/simulated_thread.h index 9272e28b56..e8e08c5000 100644 --- a/test/time_controller/simulated_thread.h +++ b/test/time_controller/simulated_thread.h @@ -37,21 +37,11 @@ class SimulatedThread : public rtc::Thread, // Thread interface void BlockingCall(rtc::FunctionView functor) override; - void Post(const rtc::Location& posted_from, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata, - bool time_sensitive) override; - void PostDelayed(const rtc::Location& posted_from, - int delay_ms, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata) override; - void PostAt(const rtc::Location& posted_from, - int64_t target_time_ms, - rtc::MessageHandler* phandler, - uint32_t id, - rtc::MessageData* pdata) override; + void PostTask(absl::AnyInvocable task) override; + void PostDelayedTask(absl::AnyInvocable task, + TimeDelta delay) override; + void PostDelayedHighPrecisionTask(absl::AnyInvocable task, + TimeDelta delay) override; void Stop() override;