From 207f8536b2e05d1824c8a330be24fc9de5e91628 Mon Sep 17 00:00:00 2001 From: Danil Chapovalov Date: Wed, 24 Aug 2022 12:19:46 +0200 Subject: [PATCH] In rtc::Thread hide MessageHandler handling as implementation details Remote Peek function as unused Move Get and Dispatch into private section to ensure they are not used from outside. Bug: webrtc:9702 Change-Id: Ibd0b236fe43543d60f97f988524526493bbeaaa7 Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272804 Reviewed-by: Tomas Gunnarsson Commit-Queue: Danil Chapovalov Cr-Commit-Position: refs/heads/main@{#37889} --- rtc_base/thread.cc | 35 +---------------------- rtc_base/thread.h | 28 +++++++++++-------- rtc_base/thread_unittest.cc | 46 +++++++++++++++++++------------ rtc_base/virtual_socket_server.cc | 5 +--- 4 files changed, 46 insertions(+), 68 deletions(-) diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 56b6b43ef3..99469896e6 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -380,8 +380,7 @@ Thread::Thread(std::unique_ptr ss) : Thread(std::move(ss), /*do_init=*/true) {} Thread::Thread(SocketServer* ss, bool do_init) - : fPeekKeep_(false), - delayed_next_num_(0), + : delayed_next_num_(0), fInitialized_(false), fDestroyed_(false), stop_(0), @@ -450,28 +449,7 @@ void Thread::Restart() { stop_.store(0, std::memory_order_release); } -bool Thread::Peek(Message* pmsg, int cmsWait) { - if (fPeekKeep_) { - *pmsg = msgPeek_; - return true; - } - if (!Get(pmsg, cmsWait)) - return false; - msgPeek_ = *pmsg; - fPeekKeep_ = true; - return true; -} - bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { - // Return and clear peek if present - // Always return the peek if it exists so there is Peek/Get symmetry - - if (fPeekKeep_) { - *pmsg = msgPeek_; - fPeekKeep_ = false; - return true; - } - // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch int64_t cmsTotal = cmsWait; @@ -650,17 +628,6 @@ int Thread::GetDelay() { void Thread::ClearInternal(MessageHandler* phandler, uint32_t id, MessageList* removed) { - // Remove messages with phandler - - if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { - if (removed) { - removed->push_back(msgPeek_); - } else { - delete msgPeek_.pdata; - } - fPeekKeep_ = false; - } - // Remove from ordered message queue for (auto it = messages_.begin(); it != messages_.end();) { diff --git a/rtc_base/thread.h b/rtc_base/thread.h index 77ccc9e2ab..ef43e51a75 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -268,14 +268,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Processed. Normally, this would be true until IsQuitting() is true. virtual bool IsProcessingMessagesForTesting(); - // Get() will process I/O until: - // 1) A message is available (returns true) - // 2) cmsWait seconds have elapsed (returns false) - // 3) Stop() is called (returns false) - virtual bool Get(Message* pmsg, - int cmsWait = kForever, - bool process_io = true); - virtual bool Peek(Message* pmsg, int cmsWait = 0); // `time_sensitive` is deprecated and should always be false. virtual void Post(const Location& posted_from, MessageHandler* phandler, @@ -295,7 +287,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { virtual void Clear(MessageHandler* phandler, uint32_t id = MQID_ANY, MessageList* removed = nullptr); - virtual void Dispatch(Message* pmsg); // Amount of time until the next message can be retrieved virtual int GetDelay(); @@ -303,7 +294,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { bool empty() const { return size() == 0u; } size_t size() const { CritScope cs(&crit_); - return messages_.size() + delayed_messages_.size() + (fPeekKeep_ ? 1u : 0u); + return messages_.size() + delayed_messages_.size(); } // Internally posts a message which causes the doomed object to be deleted @@ -522,6 +513,21 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { private: static const int kSlowDispatchLoggingThreshold = 50; // 50 ms + // TODO(bugs.webrtc.org/9702): Delete when chromium stops overriding it. + // chromium's ThreadWrapper overrides it just to check it is never called. + virtual bool Peek(Message* pmsg, int cms_wait) { + RTC_DCHECK_NOTREACHED(); + return false; + } + // Get() will process I/O until: + // 1) A message is available (returns true) + // 2) cmsWait seconds have elapsed (returns false) + // 3) Stop() is called (returns false) + virtual bool Get(Message* pmsg, + int cmsWait = kForever, + bool process_io = true); + virtual void Dispatch(Message* pmsg); + // Sets the per-thread allow-blocking-calls flag and returns the previous // value. Must be called on this thread. bool SetAllowBlockingCalls(bool allow); @@ -552,8 +558,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { // Called by the ThreadManager when being unset as the current thread. void ClearCurrentTaskQueue(); - bool fPeekKeep_; - Message msgPeek_; MessageList messages_ RTC_GUARDED_BY(crit_); PriorityQueue delayed_messages_ RTC_GUARDED_BY(crit_); uint32_t delayed_next_num_ RTC_GUARDED_BY(crit_); diff --git a/rtc_base/thread_unittest.cc b/rtc_base/thread_unittest.cc index b06b09e235..68416f5557 100644 --- a/rtc_base/thread_unittest.cc +++ b/rtc_base/thread_unittest.cc @@ -20,6 +20,7 @@ #include "rtc_base/async_udp_socket.h" #include "rtc_base/checks.h" #include "rtc_base/event.h" +#include "rtc_base/fake_clock.h" #include "rtc_base/gunit.h" #include "rtc_base/internal/default_socket_server.h" #include "rtc_base/null_socket_server.h" @@ -27,6 +28,7 @@ #include "rtc_base/socket_address.h" #include "rtc_base/synchronization/mutex.h" #include "rtc_base/third_party/sigslot/sigslot.h" +#include "test/gmock.h" #include "test/testsupport/rtc_expect_death.h" #if defined(WEBRTC_WIN) @@ -37,6 +39,7 @@ namespace rtc { namespace { +using ::testing::ElementsAre; using ::webrtc::TimeDelta; // Generates a sequence of numbers (collaboratively). @@ -585,32 +588,39 @@ struct DeletedLockChecker { bool* deleted; }; -static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) { - EXPECT_TRUE(q != nullptr); +static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder( + FakeClock& clock, + Thread& q) { + std::vector run_order; + + Event done; int64_t now = TimeMillis(); - q->PostAt(RTC_FROM_HERE, now, nullptr, 3); - q->PostAt(RTC_FROM_HERE, now - 2, nullptr, 0); - q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 1); - q->PostAt(RTC_FROM_HERE, now, nullptr, 4); - q->PostAt(RTC_FROM_HERE, now - 1, nullptr, 2); + q.PostDelayedTask([&] { run_order.push_back(3); }, TimeDelta::Millis(3)); + q.PostDelayedTask([&] { run_order.push_back(0); }, TimeDelta::Millis(1)); + q.PostDelayedTask([&] { run_order.push_back(1); }, TimeDelta::Millis(2)); + q.PostDelayedTask([&] { run_order.push_back(4); }, TimeDelta::Millis(3)); + q.PostDelayedTask([&] { run_order.push_back(2); }, TimeDelta::Millis(2)); + q.PostDelayedTask([&] { done.Set(); }, TimeDelta::Millis(4)); + // Validate time was frozen while tasks were posted. + RTC_DCHECK_EQ(TimeMillis(), now); - Message msg; - for (size_t i = 0; i < 5; ++i) { - memset(&msg, 0, sizeof(msg)); - EXPECT_TRUE(q->Get(&msg, 0)); - EXPECT_EQ(i, msg.message_id); - } + // Change time to make all tasks ready to run and wait for them. + clock.AdvanceTime(TimeDelta::Millis(4)); + ASSERT_TRUE(done.Wait(TimeDelta::Seconds(1))); - EXPECT_FALSE(q->Get(&msg, 0)); // No more messages + EXPECT_THAT(run_order, ElementsAre(0, 1, 2, 3, 4)); } TEST_F(ThreadQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) { + ScopedBaseFakeClock clock; Thread q(CreateDefaultSocketServer(), true); - DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q); + q.Start(); + DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q); NullSocketServer nullss; Thread q_nullss(&nullss, true); - DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss); + q_nullss.Start(); + DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(clock, q_nullss); } TEST_F(ThreadQueueTest, DisposeNotLocked) { @@ -619,7 +629,7 @@ TEST_F(ThreadQueueTest, DisposeNotLocked) { DeletedLockChecker* d = new DeletedLockChecker(this, &was_locked, &deleted); Dispose(d); Message msg; - EXPECT_FALSE(Get(&msg, 0)); + ProcessMessages(0); EXPECT_TRUE(deleted); EXPECT_FALSE(was_locked); } @@ -642,7 +652,7 @@ TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) { // Now, post a message, which should *not* be returned by Get(). Post(RTC_FROM_HERE, handler, 1); Message msg; - EXPECT_FALSE(Get(&msg, 0)); + ProcessMessages(0); EXPECT_TRUE(deleted); } diff --git a/rtc_base/virtual_socket_server.cc b/rtc_base/virtual_socket_server.cc index 91b486b24e..a055e59517 100644 --- a/rtc_base/virtual_socket_server.cc +++ b/rtc_base/virtual_socket_server.cc @@ -647,10 +647,7 @@ bool VirtualSocketServer::ProcessMessagesUntilIdle() { fake_clock_->AdvanceTime(webrtc::TimeDelta::Millis(1)); } else { // Otherwise, run a normal message loop. - Message msg; - if (msg_queue_->Get(&msg, Thread::kForever)) { - msg_queue_->Dispatch(&msg); - } + msg_queue_->ProcessMessages(Thread::kForever); } } stop_on_idle_ = false;