diff --git a/rtc_base/BUILD.gn b/rtc_base/BUILD.gn index d09dde83ac..0c8a477134 100644 --- a/rtc_base/BUILD.gn +++ b/rtc_base/BUILD.gn @@ -845,7 +845,6 @@ rtc_library("rtc_base") { "message_digest.h", "message_handler.cc", "message_handler.h", - "message_queue.cc", "message_queue.h", "net_helper.cc", "net_helper.h", @@ -909,6 +908,7 @@ rtc_library("rtc_base") { "stream.h", "thread.cc", "thread.h", + "thread_message.h", "unique_id_generator.cc", "unique_id_generator.h", ] diff --git a/rtc_base/async_invoker.cc b/rtc_base/async_invoker.cc index 8584bdaea2..26f8c523ab 100644 --- a/rtc_base/async_invoker.cc +++ b/rtc_base/async_invoker.cc @@ -23,14 +23,14 @@ AsyncInvoker::AsyncInvoker() AsyncInvoker::~AsyncInvoker() { destroying_.store(true, std::memory_order_relaxed); // Messages for this need to be cleared *before* our destructor is complete. - MessageQueueManager::Clear(this); + ThreadManager::Clear(this); // And we need to wait for any invocations that are still in progress on // other threads. Using memory_order_acquire for synchronization with // AsyncClosure destructors. while (pending_invocations_.load(std::memory_order_acquire) > 0) { // If the destructor was called while AsyncInvoke was being called by // another thread, WITHIN an AsyncInvoked functor, it may do another - // Thread::Post even after we called MessageQueueManager::Clear(this). So + // Thread::Post even after we called ThreadManager::Clear(this). So // we need to keep calling Clear to discard these posts. Thread::Current()->Clear(this); invocation_complete_->Wait(Event::kForever); @@ -68,7 +68,7 @@ void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) { } void AsyncInvoker::Clear() { - MessageQueueManager::Clear(this); + ThreadManager::Clear(this); } void AsyncInvoker::DoInvoke(const Location& posted_from, diff --git a/rtc_base/fake_clock.cc b/rtc_base/fake_clock.cc index b9f0ee95e5..e242e8e659 100644 --- a/rtc_base/fake_clock.cc +++ b/rtc_base/fake_clock.cc @@ -11,7 +11,7 @@ #include "rtc_base/fake_clock.h" #include "rtc_base/checks.h" -#include "rtc_base/message_queue.h" +#include "rtc_base/thread.h" namespace rtc { @@ -35,12 +35,12 @@ void ThreadProcessingFakeClock::SetTime(webrtc::Timestamp time) { clock_.SetTime(time); // If message queues are waiting in a socket select() with a timeout provided // by the OS, they should wake up and dispatch all messages that are ready. - MessageQueueManager::ProcessAllMessageQueuesForTesting(); + ThreadManager::ProcessAllMessageQueuesForTesting(); } void ThreadProcessingFakeClock::AdvanceTime(webrtc::TimeDelta delta) { clock_.AdvanceTime(delta); - MessageQueueManager::ProcessAllMessageQueuesForTesting(); + ThreadManager::ProcessAllMessageQueuesForTesting(); } ScopedBaseFakeClock::ScopedBaseFakeClock() { diff --git a/rtc_base/message_handler.cc b/rtc_base/message_handler.cc index dd86e59b30..18a06e241d 100644 --- a/rtc_base/message_handler.cc +++ b/rtc_base/message_handler.cc @@ -10,12 +10,12 @@ #include "rtc_base/message_handler.h" -#include "rtc_base/message_queue.h" +#include "rtc_base/thread.h" namespace rtc { MessageHandler::~MessageHandler() { - MessageQueueManager::Clear(this); + ThreadManager::Clear(this); } } // namespace rtc diff --git a/rtc_base/message_queue.cc b/rtc_base/message_queue.cc deleted file mode 100644 index 9a2e1f226a..0000000000 --- a/rtc_base/message_queue.cc +++ /dev/null @@ -1,523 +0,0 @@ -/* - * Copyright 2004 The WebRTC Project Authors. All rights reserved. - * - * Use of this source code is governed by a BSD-style license - * that can be found in the LICENSE file in the root of the source - * tree. An additional intellectual property rights grant can be found - * in the file PATENTS. All contributing project authors may - * be found in the AUTHORS file in the root of the source tree. - */ -#include "rtc_base/message_queue.h" - -#include -#include - -#include "absl/algorithm/container.h" -#include "rtc_base/atomic_ops.h" -#include "rtc_base/checks.h" -#include "rtc_base/logging.h" -#include "rtc_base/thread.h" -#include "rtc_base/time_utils.h" -#include "rtc_base/trace_event.h" - -namespace rtc { -namespace { - -const int kMaxMsgLatency = 150; // 150 ms -const int kSlowDispatchLoggingThreshold = 50; // 50 ms - -class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { - public: - MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) - RTC_EXCLUSIVE_LOCK_FUNCTION(cs) - : cs_(cs), processing_(processing) { - cs_->Enter(); - *processing_ += 1; - } - - ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() { - *processing_ -= 1; - cs_->Leave(); - } - - private: - const CriticalSection* const cs_; - size_t* processing_; - - RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); -}; -} // namespace - -//------------------------------------------------------------------ -// MessageQueueManager - -MessageQueueManager* MessageQueueManager::Instance() { - static MessageQueueManager* const instance = new MessageQueueManager; - return instance; -} - -MessageQueueManager::MessageQueueManager() : processing_(0) {} - -MessageQueueManager::~MessageQueueManager() {} - -void MessageQueueManager::Add(MessageQueue* message_queue) { - return Instance()->AddInternal(message_queue); -} -void MessageQueueManager::AddInternal(MessageQueue* message_queue) { - CritScope cs(&crit_); - // Prevent changes while the list of message queues is processed. - RTC_DCHECK_EQ(processing_, 0); - message_queues_.push_back(message_queue); -} - -void MessageQueueManager::Remove(MessageQueue* message_queue) { - return Instance()->RemoveInternal(message_queue); -} -void MessageQueueManager::RemoveInternal(MessageQueue* message_queue) { - { - CritScope cs(&crit_); - // Prevent changes while the list of message queues is processed. - RTC_DCHECK_EQ(processing_, 0); - std::vector::iterator iter; - iter = absl::c_find(message_queues_, message_queue); - if (iter != message_queues_.end()) { - message_queues_.erase(iter); - } - } -} - -void MessageQueueManager::Clear(MessageHandler* handler) { - return Instance()->ClearInternal(handler); -} -void MessageQueueManager::ClearInternal(MessageHandler* handler) { - // Deleted objects may cause re-entrant calls to ClearInternal. This is - // allowed as the list of message queues does not change while queues are - // cleared. - MarkProcessingCritScope cs(&crit_, &processing_); - for (MessageQueue* queue : message_queues_) { - queue->Clear(handler); - } -} - -void MessageQueueManager::ProcessAllMessageQueuesForTesting() { - return Instance()->ProcessAllMessageQueuesInternal(); -} - -void MessageQueueManager::ProcessAllMessageQueuesInternal() { - // This works by posting a delayed message at the current time and waiting - // for it to be dispatched on all queues, which will ensure that all messages - // that came before it were also dispatched. - volatile int queues_not_done = 0; - - // This class is used so that whether the posted message is processed, or the - // message queue is simply cleared, queues_not_done gets decremented. - class ScopedIncrement : public MessageData { - public: - ScopedIncrement(volatile int* value) : value_(value) { - AtomicOps::Increment(value_); - } - ~ScopedIncrement() override { AtomicOps::Decrement(value_); } - - private: - volatile int* value_; - }; - - { - MarkProcessingCritScope cs(&crit_, &processing_); - for (MessageQueue* queue : message_queues_) { - if (!queue->IsProcessingMessagesForTesting()) { - // If the queue is not processing messages, it can - // be ignored. If we tried to post a message to it, it would be dropped - // or ignored. - continue; - } - queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, - new ScopedIncrement(&queues_not_done)); - } - } - - rtc::Thread* current = rtc::Thread::Current(); - // Note: One of the message queues may have been on this thread, which is - // why we can't synchronously wait for queues_not_done to go to 0; we need - // to process messages as well. - while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { - if (current) { - current->ProcessMessages(0); - } - } -} - -//------------------------------------------------------------------ -// MessageQueue -MessageQueue::MessageQueue(SocketServer* ss, bool init_queue) - : fPeekKeep_(false), - dmsgq_next_num_(0), - fInitialized_(false), - fDestroyed_(false), - stop_(0), - ss_(ss) { - RTC_DCHECK(ss); - // Currently, MessageQueue holds a socket server, and is the base class for - // Thread. It seems like it makes more sense for Thread to hold the socket - // server, and provide it to the MessageQueue, since the Thread controls - // the I/O model, and MQ is agnostic to those details. Anyway, this causes - // messagequeue_unittest to depend on network libraries... yuck. - if (init_queue) { - DoInit(); - } -} - -MessageQueue::MessageQueue(std::unique_ptr ss, bool init_queue) - : MessageQueue(ss.get(), init_queue) { - own_ss_ = std::move(ss); -} - -MessageQueue::~MessageQueue() { - DoDestroy(); -} - -void MessageQueue::DoInit() { - if (fInitialized_) { - return; - } - - fInitialized_ = true; - MessageQueueManager::Add(this); -} - -void MessageQueue::DoDestroy() { - if (fDestroyed_) { - return; - } - - fDestroyed_ = true; - // The signal is done from here to ensure - // that it always gets called when the queue - // is going away. - SignalQueueDestroyed(); - MessageQueueManager::Remove(this); - ClearInternal(nullptr, MQID_ANY, nullptr); - - if (ss_) { - ss_->SetMessageQueue(nullptr); - } -} - -SocketServer* MessageQueue::socketserver() { - return ss_; -} - -void MessageQueue::WakeUpSocketServer() { - ss_->WakeUp(); -} - -void MessageQueue::Quit() { - AtomicOps::ReleaseStore(&stop_, 1); - WakeUpSocketServer(); -} - -bool MessageQueue::IsQuitting() { - return AtomicOps::AcquireLoad(&stop_) != 0; -} - -bool MessageQueue::IsProcessingMessagesForTesting() { - return !IsQuitting(); -} - -void MessageQueue::Restart() { - AtomicOps::ReleaseStore(&stop_, 0); -} - -bool MessageQueue::Peek(Message* pmsg, int cmsWait) { - if (fPeekKeep_) { - *pmsg = msgPeek_; - return true; - } - if (!Get(pmsg, cmsWait)) - return false; - msgPeek_ = *pmsg; - fPeekKeep_ = true; - return true; -} - -bool MessageQueue::Get(Message* pmsg, int cmsWait, bool process_io) { - // Return and clear peek if present - // Always return the peek if it exists so there is Peek/Get symmetry - - if (fPeekKeep_) { - *pmsg = msgPeek_; - fPeekKeep_ = false; - return true; - } - - // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch - - int64_t cmsTotal = cmsWait; - int64_t cmsElapsed = 0; - int64_t msStart = TimeMillis(); - int64_t msCurrent = msStart; - while (true) { - // Check for sent messages - ReceiveSends(); - - // Check for posted events - int64_t cmsDelayNext = kForever; - bool first_pass = true; - while (true) { - // All queue operations need to be locked, but nothing else in this loop - // (specifically handling disposed message) can happen inside the crit. - // Otherwise, disposed MessageHandlers will cause deadlocks. - { - CritScope cs(&crit_); - // On the first pass, check for delayed messages that have been - // triggered and calculate the next trigger time. - if (first_pass) { - first_pass = false; - while (!dmsgq_.empty()) { - if (msCurrent < dmsgq_.top().msTrigger_) { - cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); - break; - } - msgq_.push_back(dmsgq_.top().msg_); - dmsgq_.pop(); - } - } - // Pull a message off the message queue, if available. - if (msgq_.empty()) { - break; - } else { - *pmsg = msgq_.front(); - msgq_.pop_front(); - } - } // crit_ is released here. - - // Log a warning for time-sensitive messages that we're late to deliver. - if (pmsg->ts_sensitive) { - int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); - if (delay > 0) { - RTC_LOG_F(LS_WARNING) - << "id: " << pmsg->message_id - << " delay: " << (delay + kMaxMsgLatency) << "ms"; - } - } - // If this was a dispose message, delete it and skip it. - if (MQID_DISPOSE == pmsg->message_id) { - RTC_DCHECK(nullptr == pmsg->phandler); - delete pmsg->pdata; - *pmsg = Message(); - continue; - } - return true; - } - - if (IsQuitting()) - break; - - // Which is shorter, the delay wait or the asked wait? - - int64_t cmsNext; - if (cmsWait == kForever) { - cmsNext = cmsDelayNext; - } else { - cmsNext = std::max(0, cmsTotal - cmsElapsed); - if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) - cmsNext = cmsDelayNext; - } - - { - // Wait and multiplex in the meantime - if (!ss_->Wait(static_cast(cmsNext), process_io)) - return false; - } - - // If the specified timeout expired, return - - msCurrent = TimeMillis(); - cmsElapsed = TimeDiff(msCurrent, msStart); - if (cmsWait != kForever) { - if (cmsElapsed >= cmsWait) - return false; - } - } - return false; -} - -void MessageQueue::ReceiveSends() {} - -void MessageQueue::Post(const Location& posted_from, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata, - bool time_sensitive) { - if (IsQuitting()) { - delete pdata; - return; - } - - // Keep thread safe - // Add the message to the end of the queue - // Signal for the multiplexer to return - - { - CritScope cs(&crit_); - Message msg; - msg.posted_from = posted_from; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - if (time_sensitive) { - msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; - } - msgq_.push_back(msg); - } - WakeUpSocketServer(); -} - -void MessageQueue::PostDelayed(const Location& posted_from, - int cmsDelay, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id, - pdata); -} - -void MessageQueue::PostAt(const Location& posted_from, - uint32_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - // This should work even if it is used (unexpectedly). - int64_t delay = static_cast(TimeMillis()) - tstamp; - return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata); -} - -void MessageQueue::PostAt(const Location& posted_from, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, - pdata); -} - -void MessageQueue::DoDelayPost(const Location& posted_from, - int64_t cmsDelay, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata) { - if (IsQuitting()) { - delete pdata; - return; - } - - // Keep thread safe - // Add to the priority queue. Gets sorted soonest first. - // Signal for the multiplexer to return. - - { - CritScope cs(&crit_); - Message msg; - msg.posted_from = posted_from; - msg.phandler = phandler; - msg.message_id = id; - msg.pdata = pdata; - DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); - dmsgq_.push(dmsg); - // If this message queue processes 1 message every millisecond for 50 days, - // we will wrap this number. Even then, only messages with identical times - // will be misordered, and then only briefly. This is probably ok. - ++dmsgq_next_num_; - RTC_DCHECK_NE(0, dmsgq_next_num_); - } - WakeUpSocketServer(); -} - -int MessageQueue::GetDelay() { - CritScope cs(&crit_); - - if (!msgq_.empty()) - return 0; - - if (!dmsgq_.empty()) { - int delay = TimeUntil(dmsgq_.top().msTrigger_); - if (delay < 0) - delay = 0; - return delay; - } - - return kForever; -} - -void MessageQueue::Clear(MessageHandler* phandler, - uint32_t id, - MessageList* removed) { - CritScope cs(&crit_); - ClearInternal(phandler, id, removed); -} - -void MessageQueue::ClearInternal(MessageHandler* phandler, - uint32_t id, - MessageList* removed) { - // Remove messages with phandler - - if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { - if (removed) { - removed->push_back(msgPeek_); - } else { - delete msgPeek_.pdata; - } - fPeekKeep_ = false; - } - - // Remove from ordered message queue - - for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { - if (it->Match(phandler, id)) { - if (removed) { - removed->push_back(*it); - } else { - delete it->pdata; - } - it = msgq_.erase(it); - } else { - ++it; - } - } - - // Remove from priority queue. Not directly iterable, so use this approach - - PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); - for (PriorityQueue::container_type::iterator it = new_end; - it != dmsgq_.container().end(); ++it) { - if (it->msg_.Match(phandler, id)) { - if (removed) { - removed->push_back(it->msg_); - } else { - delete it->msg_.pdata; - } - } else { - *new_end++ = *it; - } - } - dmsgq_.container().erase(new_end, dmsgq_.container().end()); - dmsgq_.reheap(); -} - -void MessageQueue::Dispatch(Message* pmsg) { - TRACE_EVENT2("webrtc", "MessageQueue::Dispatch", "src_file", - pmsg->posted_from.file_name(), "src_func", - pmsg->posted_from.function_name()); - int64_t start_time = TimeMillis(); - pmsg->phandler->OnMessage(pmsg); - int64_t end_time = TimeMillis(); - int64_t diff = TimeDiff(end_time, start_time); - if (diff >= kSlowDispatchLoggingThreshold) { - RTC_LOG(LS_INFO) << "Message took " << diff - << "ms to dispatch. Posted from: " - << pmsg->posted_from.ToString(); - } -} - -} // namespace rtc diff --git a/rtc_base/message_queue.h b/rtc_base/message_queue.h index bb58ebccac..ffad9fe852 100644 --- a/rtc_base/message_queue.h +++ b/rtc_base/message_queue.h @@ -11,324 +11,8 @@ #ifndef RTC_BASE_MESSAGE_QUEUE_H_ #define RTC_BASE_MESSAGE_QUEUE_H_ -#include +// TODO(srte): Remove this file when all dependencies has been updated. -#include -#include -#include -#include -#include - -#include "api/scoped_refptr.h" -#include "rtc_base/constructor_magic.h" -#include "rtc_base/critical_section.h" -#include "rtc_base/location.h" -#include "rtc_base/message_handler.h" -#include "rtc_base/socket_server.h" -#include "rtc_base/system/rtc_export.h" -#include "rtc_base/third_party/sigslot/sigslot.h" -#include "rtc_base/thread_annotations.h" - -namespace rtc { - -struct Message; -class MessageQueue; - -// MessageQueueManager does cleanup of of message queues - -class RTC_EXPORT MessageQueueManager { - public: - static void Add(MessageQueue* message_queue); - static void Remove(MessageQueue* message_queue); - static void Clear(MessageHandler* handler); - - // TODO(nisse): Delete alias, as soon as downstream code is updated. - static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); } - - // For testing purposes, for use with a simulated clock. - // Ensures that all message queues have processed delayed messages - // up until the current point in time. - static void ProcessAllMessageQueuesForTesting(); - - private: - static MessageQueueManager* Instance(); - - MessageQueueManager(); - ~MessageQueueManager(); - - void AddInternal(MessageQueue* message_queue); - void RemoveInternal(MessageQueue* message_queue); - void ClearInternal(MessageHandler* handler); - void ProcessAllMessageQueuesInternal(); - - // This list contains all live MessageQueues. - std::vector message_queues_ RTC_GUARDED_BY(crit_); - - // Methods that don't modify the list of message queues may be called in a - // re-entrant fashion. "processing_" keeps track of the depth of re-entrant - // calls. - CriticalSection crit_; - size_t processing_ RTC_GUARDED_BY(crit_); -}; - -// Derive from this for specialized data -// App manages lifetime, except when messages are purged - -class MessageData { - public: - MessageData() {} - virtual ~MessageData() {} -}; - -template -class TypedMessageData : public MessageData { - public: - explicit TypedMessageData(const T& data) : data_(data) {} - const T& data() const { return data_; } - T& data() { return data_; } - - private: - T data_; -}; - -// Like TypedMessageData, but for pointers that require a delete. -template -class ScopedMessageData : public MessageData { - public: - explicit ScopedMessageData(std::unique_ptr data) - : data_(std::move(data)) {} - // Deprecated. - // TODO(deadbeef): Remove this once downstream applications stop using it. - explicit ScopedMessageData(T* data) : data_(data) {} - // Deprecated. - // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of - // this once downstream applications stop using it, then rename inner_data to - // just data. - const std::unique_ptr& data() const { return data_; } - std::unique_ptr& data() { return data_; } - - const T& inner_data() const { return *data_; } - T& inner_data() { return *data_; } - - private: - std::unique_ptr data_; -}; - -// Like ScopedMessageData, but for reference counted pointers. -template -class ScopedRefMessageData : public MessageData { - public: - explicit ScopedRefMessageData(T* data) : data_(data) {} - const scoped_refptr& data() const { return data_; } - scoped_refptr& data() { return data_; } - - private: - scoped_refptr data_; -}; - -template -inline MessageData* WrapMessageData(const T& data) { - return new TypedMessageData(data); -} - -template -inline const T& UseMessageData(MessageData* data) { - return static_cast*>(data)->data(); -} - -template -class DisposeData : public MessageData { - public: - explicit DisposeData(T* data) : data_(data) {} - virtual ~DisposeData() { delete data_; } - - private: - T* data_; -}; - -const uint32_t MQID_ANY = static_cast(-1); -const uint32_t MQID_DISPOSE = static_cast(-2); - -// No destructor - -struct Message { - Message() - : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {} - inline bool Match(MessageHandler* handler, uint32_t id) const { - return (handler == nullptr || handler == phandler) && - (id == MQID_ANY || id == message_id); - } - Location posted_from; - MessageHandler* phandler; - uint32_t message_id; - MessageData* pdata; - int64_t ts_sensitive; -}; - -typedef std::list MessageList; - -// DelayedMessage goes into a priority queue, sorted by trigger time. Messages -// with the same trigger time are processed in num_ (FIFO) order. - -class DelayedMessage { - public: - DelayedMessage(int64_t delay, - int64_t trigger, - uint32_t num, - const Message& msg) - : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {} - - bool operator<(const DelayedMessage& dmsg) const { - return (dmsg.msTrigger_ < msTrigger_) || - ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); - } - - int64_t cmsDelay_; // for debugging - int64_t msTrigger_; - uint32_t num_; - Message msg_; -}; - -class RTC_EXPORT MessageQueue { - public: - static const int kForever = -1; - - // Create a new MessageQueue and optionally assign it to the passed - // SocketServer. Subclasses that override Clear should pass false for - // init_queue and call DoInit() from their constructor to prevent races - // with the MessageQueueManager using the object while the vtable is still - // being created. - MessageQueue(SocketServer* ss, bool init_queue); - MessageQueue(std::unique_ptr ss, bool init_queue); - - // NOTE: SUBCLASSES OF MessageQueue THAT OVERRIDE Clear MUST CALL - // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race - // between the destructor modifying the vtable, and the MessageQueueManager - // calling Clear on the object from a different thread. - virtual ~MessageQueue(); - - SocketServer* socketserver(); - - // Note: The behavior of MessageQueue has changed. When a MQ is stopped, - // futher Posts and Sends will fail. However, any pending Sends and *ready* - // Posts (as opposed to unexpired delayed Posts) will be delivered before - // Get (or Peek) returns false. By guaranteeing delivery of those messages, - // we eliminate the race condition when an MessageHandler and MessageQueue - // may be destroyed independently of each other. - virtual void Quit(); - virtual bool IsQuitting(); - virtual void Restart(); - // Not all message queues actually process messages (such as SignalThread). - // In those cases, it's important to know, before posting, that it won't be - // Processed. Normally, this would be true until IsQuitting() is true. - virtual bool IsProcessingMessagesForTesting(); - - // Get() will process I/O until: - // 1) A message is available (returns true) - // 2) cmsWait seconds have elapsed (returns false) - // 3) Stop() is called (returns false) - virtual bool Get(Message* pmsg, - int cmsWait = kForever, - bool process_io = true); - virtual bool Peek(Message* pmsg, int cmsWait = 0); - virtual void Post(const Location& posted_from, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr, - bool time_sensitive = false); - virtual void PostDelayed(const Location& posted_from, - int cmsDelay, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - virtual void PostAt(const Location& posted_from, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - // TODO(honghaiz): Remove this when all the dependencies are removed. - virtual void PostAt(const Location& posted_from, - uint32_t tstamp, - MessageHandler* phandler, - uint32_t id = 0, - MessageData* pdata = nullptr); - virtual void Clear(MessageHandler* phandler, - uint32_t id = MQID_ANY, - MessageList* removed = nullptr); - virtual void Dispatch(Message* pmsg); - virtual void ReceiveSends(); - - // Amount of time until the next message can be retrieved - virtual int GetDelay(); - - bool empty() const { return size() == 0u; } - size_t size() const { - CritScope cs(&crit_); // msgq_.size() is not thread safe. - return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); - } - - // Internally posts a message which causes the doomed object to be deleted - template - void Dispose(T* doomed) { - if (doomed) { - Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData(doomed)); - } - } - - // When this signal is sent out, any references to this queue should - // no longer be used. - sigslot::signal0<> SignalQueueDestroyed; - - protected: - class PriorityQueue : public std::priority_queue { - public: - container_type& container() { return c; } - void reheap() { make_heap(c.begin(), c.end(), comp); } - }; - - void DoDelayPost(const Location& posted_from, - int64_t cmsDelay, - int64_t tstamp, - MessageHandler* phandler, - uint32_t id, - MessageData* pdata); - - // Perform initialization, subclasses must call this from their constructor - // if false was passed as init_queue to the MessageQueue constructor. - void DoInit(); - - // Does not take any lock. Must be called either while holding crit_, or by - // the destructor (by definition, the latter has exclusive access). - void ClearInternal(MessageHandler* phandler, - uint32_t id, - MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); - - // Perform cleanup; subclasses must call this from the destructor, - // and are not expected to actually hold the lock. - void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); - - void WakeUpSocketServer(); - - bool fPeekKeep_; - Message msgPeek_; - MessageList msgq_ RTC_GUARDED_BY(crit_); - PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_); - uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_); - CriticalSection crit_; - bool fInitialized_; - bool fDestroyed_; - - private: - volatile int stop_; - - // The SocketServer might not be owned by MessageQueue. - SocketServer* const ss_; - // Used if SocketServer ownership lies with |this|. - std::unique_ptr own_ss_; - - RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(MessageQueue); -}; - -} // namespace rtc +#include "rtc_base/thread.h" #endif // RTC_BASE_MESSAGE_QUEUE_H_ diff --git a/rtc_base/message_queue_unittest.cc b/rtc_base/message_queue_unittest.cc index 0c0cfc4b41..4d3ea95a80 100644 --- a/rtc_base/message_queue_unittest.cc +++ b/rtc_base/message_queue_unittest.cc @@ -8,7 +8,7 @@ * be found in the AUTHORS file in the root of the source tree. */ -#include "rtc_base/message_queue.h" +#include "rtc_base/thread.h" #include @@ -29,14 +29,14 @@ namespace { using ::webrtc::ToQueuedTask; -class MessageQueueTest : public ::testing::Test, public MessageQueue { +class MessageQueueTest : public ::testing::Test, public Thread { public: - MessageQueueTest() : MessageQueue(SocketServer::CreateDefault(), true) {} + MessageQueueTest() : Thread(SocketServer::CreateDefault(), true) {} bool IsLocked_Worker() { - if (!crit_.TryEnter()) { + if (!CritForTest()->TryEnter()) { return true; } - crit_.Leave(); + CritForTest()->Leave(); return false; } bool IsLocked() { @@ -61,8 +61,7 @@ struct DeletedLockChecker { bool* deleted; }; -static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder( - MessageQueue* q) { +static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(Thread* q) { EXPECT_TRUE(q != nullptr); int64_t now = TimeMillis(); q->PostAt(RTC_FROM_HERE, now, nullptr, 3); @@ -83,11 +82,11 @@ static void DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder( TEST_F(MessageQueueTest, DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder) { - MessageQueue q(SocketServer::CreateDefault(), true); + Thread q(SocketServer::CreateDefault(), true); DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q); NullSocketServer nullss; - MessageQueue q_nullss(&nullss, true); + Thread q_nullss(&nullss, true); DelayedPostsWithIdenticalTimesAreProcessedInFifoOrder(&q_nullss); } @@ -127,7 +126,7 @@ TEST_F(MessageQueueTest, DiposeHandlerWithPostedMessagePending) { // Ensure that ProcessAllMessageQueues does its essential function; process // all messages (both delayed and non delayed) up until the current time, on // all registered message queues. -TEST(MessageQueueManager, ProcessAllMessageQueues) { +TEST(ThreadManager, ProcessAllMessageQueues) { Event entered_process_all_message_queues(true, false); auto a = Thread::CreateWithSocketServer(); auto b = Thread::CreateWithSocketServer(); @@ -155,21 +154,21 @@ TEST(MessageQueueManager, ProcessAllMessageQueues) { b->PostDelayedTask(ToQueuedTask(incrementer), 0); rtc::Thread::Current()->PostTask(ToQueuedTask(event_signaler)); - MessageQueueManager::ProcessAllMessageQueuesForTesting(); + ThreadManager::ProcessAllMessageQueuesForTesting(); EXPECT_EQ(4, AtomicOps::AcquireLoad(&messages_processed)); } // Test that ProcessAllMessageQueues doesn't hang if a thread is quitting. -TEST(MessageQueueManager, ProcessAllMessageQueuesWithQuittingThread) { +TEST(ThreadManager, ProcessAllMessageQueuesWithQuittingThread) { auto t = Thread::CreateWithSocketServer(); t->Start(); t->Quit(); - MessageQueueManager::ProcessAllMessageQueuesForTesting(); + ThreadManager::ProcessAllMessageQueuesForTesting(); } // Test that ProcessAllMessageQueues doesn't hang if a queue clears its // messages. -TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) { +TEST(ThreadManager, ProcessAllMessageQueuesWithClearedQueue) { Event entered_process_all_message_queues(true, false); auto t = Thread::CreateWithSocketServer(); t->Start(); @@ -189,7 +188,7 @@ TEST(MessageQueueManager, ProcessAllMessageQueuesWithClearedQueue) { // Post messages (both delayed and non delayed) to both threads. t->PostTask(RTC_FROM_HERE, clearer); rtc::Thread::Current()->PostTask(RTC_FROM_HERE, event_signaler); - MessageQueueManager::ProcessAllMessageQueuesForTesting(); + ThreadManager::ProcessAllMessageQueuesForTesting(); } class RefCountedHandler : public MessageHandler, public rtc::RefCountInterface { @@ -202,7 +201,7 @@ class EmptyHandler : public MessageHandler { void OnMessage(Message* msg) override {} }; -TEST(MessageQueueManager, ClearReentrant) { +TEST(ThreadManager, ClearReentrant) { std::unique_ptr t(Thread::Create()); EmptyHandler handler; RefCountedHandler* inner_handler( diff --git a/rtc_base/thread.cc b/rtc_base/thread.cc index 4bc3863d73..ba5b617418 100644 --- a/rtc_base/thread.cc +++ b/rtc_base/thread.cc @@ -28,6 +28,8 @@ #include +#include "absl/algorithm/container.h" +#include "rtc_base/atomic_ops.h" #include "rtc_base/checks.h" #include "rtc_base/critical_section.h" #include "rtc_base/logging.h" @@ -65,6 +67,9 @@ class ScopedAutoReleasePool { namespace rtc { namespace { +const int kMaxMsgLatency = 150; // 150 ms +const int kSlowDispatchLoggingThreshold = 50; // 50 ms + class MessageHandlerWithTask final : public MessageHandler { public: MessageHandlerWithTask() = default; @@ -80,6 +85,27 @@ class MessageHandlerWithTask final : public MessageHandler { RTC_DISALLOW_COPY_AND_ASSIGN(MessageHandlerWithTask); }; +class RTC_SCOPED_LOCKABLE MarkProcessingCritScope { + public: + MarkProcessingCritScope(const CriticalSection* cs, size_t* processing) + RTC_EXCLUSIVE_LOCK_FUNCTION(cs) + : cs_(cs), processing_(processing) { + cs_->Enter(); + *processing_ += 1; + } + + ~MarkProcessingCritScope() RTC_UNLOCK_FUNCTION() { + *processing_ -= 1; + cs_->Leave(); + } + + private: + const CriticalSection* const cs_; + size_t* processing_; + + RTC_DISALLOW_COPY_AND_ASSIGN(MarkProcessingCritScope); +}; + } // namespace ThreadManager* ThreadManager::Instance() { @@ -92,6 +118,97 @@ ThreadManager::~ThreadManager() { RTC_NOTREACHED() << "ThreadManager should never be destructed."; } +// static +void ThreadManager::Add(Thread* message_queue) { + return Instance()->AddInternal(message_queue); +} +void ThreadManager::AddInternal(Thread* message_queue) { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + message_queues_.push_back(message_queue); +} + +// static +void ThreadManager::Remove(Thread* message_queue) { + return Instance()->RemoveInternal(message_queue); +} +void ThreadManager::RemoveInternal(Thread* message_queue) { + { + CritScope cs(&crit_); + // Prevent changes while the list of message queues is processed. + RTC_DCHECK_EQ(processing_, 0); + std::vector::iterator iter; + iter = absl::c_find(message_queues_, message_queue); + if (iter != message_queues_.end()) { + message_queues_.erase(iter); + } + } +} + +// static +void ThreadManager::Clear(MessageHandler* handler) { + return Instance()->ClearInternal(handler); +} +void ThreadManager::ClearInternal(MessageHandler* handler) { + // Deleted objects may cause re-entrant calls to ClearInternal. This is + // allowed as the list of message queues does not change while queues are + // cleared. + MarkProcessingCritScope cs(&crit_, &processing_); + for (Thread* queue : message_queues_) { + queue->Clear(handler); + } +} + +// static +void ThreadManager::ProcessAllMessageQueuesForTesting() { + return Instance()->ProcessAllMessageQueuesInternal(); +} + +void ThreadManager::ProcessAllMessageQueuesInternal() { + // This works by posting a delayed message at the current time and waiting + // for it to be dispatched on all queues, which will ensure that all messages + // that came before it were also dispatched. + volatile int queues_not_done = 0; + + // This class is used so that whether the posted message is processed, or the + // message queue is simply cleared, queues_not_done gets decremented. + class ScopedIncrement : public MessageData { + public: + ScopedIncrement(volatile int* value) : value_(value) { + AtomicOps::Increment(value_); + } + ~ScopedIncrement() override { AtomicOps::Decrement(value_); } + + private: + volatile int* value_; + }; + + { + MarkProcessingCritScope cs(&crit_, &processing_); + for (Thread* queue : message_queues_) { + if (!queue->IsProcessingMessagesForTesting()) { + // If the queue is not processing messages, it can + // be ignored. If we tried to post a message to it, it would be dropped + // or ignored. + continue; + } + queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, + new ScopedIncrement(&queues_not_done)); + } + } + + rtc::Thread* current = rtc::Thread::Current(); + // Note: One of the message queues may have been on this thread, which is + // why we can't synchronously wait for queues_not_done to go to 0; we need + // to process messages as well. + while (AtomicOps::AcquireLoad(&queues_not_done) > 0) { + if (current) { + current->ProcessMessages(0); + } + } +} + // static Thread* Thread::Current() { ThreadManager* manager = ThreadManager::Instance(); @@ -180,8 +297,14 @@ Thread::Thread(std::unique_ptr ss) : Thread(std::move(ss), /*do_init=*/true) {} Thread::Thread(SocketServer* ss, bool do_init) - : MessageQueue(ss, /*do_init=*/false) { - socketserver()->SetMessageQueue(this); + : fPeekKeep_(false), + dmsgq_next_num_(0), + fInitialized_(false), + fDestroyed_(false), + stop_(0), + ss_(ss) { + RTC_DCHECK(ss); + ss_->SetMessageQueue(this); SetName("Thread", this); // default name if (do_init) { DoInit(); @@ -189,12 +312,8 @@ Thread::Thread(SocketServer* ss, bool do_init) } Thread::Thread(std::unique_ptr ss, bool do_init) - : MessageQueue(std::move(ss), false) { - socketserver()->SetMessageQueue(this); - SetName("Thread", this); // default name - if (do_init) { - DoInit(); - } + : Thread(ss.get(), do_init) { + own_ss_ = std::move(ss); } Thread::~Thread() { @@ -202,6 +321,337 @@ Thread::~Thread() { DoDestroy(); } +void Thread::DoInit() { + if (fInitialized_) { + return; + } + + fInitialized_ = true; + ThreadManager::Add(this); +} + +void Thread::DoDestroy() { + if (fDestroyed_) { + return; + } + + fDestroyed_ = true; + // The signal is done from here to ensure + // that it always gets called when the queue + // is going away. + SignalQueueDestroyed(); + ThreadManager::Remove(this); + ClearInternal(nullptr, MQID_ANY, nullptr); + + if (ss_) { + ss_->SetMessageQueue(nullptr); + } +} + +SocketServer* Thread::socketserver() { + return ss_; +} + +void Thread::WakeUpSocketServer() { + ss_->WakeUp(); +} + +void Thread::Quit() { + AtomicOps::ReleaseStore(&stop_, 1); + WakeUpSocketServer(); +} + +bool Thread::IsQuitting() { + return AtomicOps::AcquireLoad(&stop_) != 0; +} + +void Thread::Restart() { + AtomicOps::ReleaseStore(&stop_, 0); +} + +bool Thread::Peek(Message* pmsg, int cmsWait) { + if (fPeekKeep_) { + *pmsg = msgPeek_; + return true; + } + if (!Get(pmsg, cmsWait)) + return false; + msgPeek_ = *pmsg; + fPeekKeep_ = true; + return true; +} + +bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) { + // Return and clear peek if present + // Always return the peek if it exists so there is Peek/Get symmetry + + if (fPeekKeep_) { + *pmsg = msgPeek_; + fPeekKeep_ = false; + return true; + } + + // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch + + int64_t cmsTotal = cmsWait; + int64_t cmsElapsed = 0; + int64_t msStart = TimeMillis(); + int64_t msCurrent = msStart; + while (true) { + // Check for sent messages + ReceiveSends(); + + // Check for posted events + int64_t cmsDelayNext = kForever; + bool first_pass = true; + while (true) { + // All queue operations need to be locked, but nothing else in this loop + // (specifically handling disposed message) can happen inside the crit. + // Otherwise, disposed MessageHandlers will cause deadlocks. + { + CritScope cs(&crit_); + // On the first pass, check for delayed messages that have been + // triggered and calculate the next trigger time. + if (first_pass) { + first_pass = false; + while (!dmsgq_.empty()) { + if (msCurrent < dmsgq_.top().msTrigger_) { + cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); + break; + } + msgq_.push_back(dmsgq_.top().msg_); + dmsgq_.pop(); + } + } + // Pull a message off the message queue, if available. + if (msgq_.empty()) { + break; + } else { + *pmsg = msgq_.front(); + msgq_.pop_front(); + } + } // crit_ is released here. + + // Log a warning for time-sensitive messages that we're late to deliver. + if (pmsg->ts_sensitive) { + int64_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); + if (delay > 0) { + RTC_LOG_F(LS_WARNING) + << "id: " << pmsg->message_id + << " delay: " << (delay + kMaxMsgLatency) << "ms"; + } + } + // If this was a dispose message, delete it and skip it. + if (MQID_DISPOSE == pmsg->message_id) { + RTC_DCHECK(nullptr == pmsg->phandler); + delete pmsg->pdata; + *pmsg = Message(); + continue; + } + return true; + } + + if (IsQuitting()) + break; + + // Which is shorter, the delay wait or the asked wait? + + int64_t cmsNext; + if (cmsWait == kForever) { + cmsNext = cmsDelayNext; + } else { + cmsNext = std::max(0, cmsTotal - cmsElapsed); + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) + cmsNext = cmsDelayNext; + } + + { + // Wait and multiplex in the meantime + if (!ss_->Wait(static_cast(cmsNext), process_io)) + return false; + } + + // If the specified timeout expired, return + + msCurrent = TimeMillis(); + cmsElapsed = TimeDiff(msCurrent, msStart); + if (cmsWait != kForever) { + if (cmsElapsed >= cmsWait) + return false; + } + } + return false; +} + +void Thread::Post(const Location& posted_from, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata, + bool time_sensitive) { + if (IsQuitting()) { + delete pdata; + return; + } + + // Keep thread safe + // Add the message to the end of the queue + // Signal for the multiplexer to return + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + if (time_sensitive) { + msg.ts_sensitive = TimeMillis() + kMaxMsgLatency; + } + msgq_.push_back(msg); + } + WakeUpSocketServer(); +} + +void Thread::PostDelayed(const Location& posted_from, + int cmsDelay, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, cmsDelay, TimeAfter(cmsDelay), phandler, id, + pdata); +} + +void Thread::PostAt(const Location& posted_from, + uint32_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + // This should work even if it is used (unexpectedly). + int64_t delay = static_cast(TimeMillis()) - tstamp; + return DoDelayPost(posted_from, delay, tstamp, phandler, id, pdata); +} + +void Thread::PostAt(const Location& posted_from, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + return DoDelayPost(posted_from, TimeUntil(tstamp), tstamp, phandler, id, + pdata); +} + +void Thread::DoDelayPost(const Location& posted_from, + int64_t cmsDelay, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata) { + if (IsQuitting()) { + delete pdata; + return; + } + + // Keep thread safe + // Add to the priority queue. Gets sorted soonest first. + // Signal for the multiplexer to return. + + { + CritScope cs(&crit_); + Message msg; + msg.posted_from = posted_from; + msg.phandler = phandler; + msg.message_id = id; + msg.pdata = pdata; + DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); + dmsgq_.push(dmsg); + // If this message queue processes 1 message every millisecond for 50 days, + // we will wrap this number. Even then, only messages with identical times + // will be misordered, and then only briefly. This is probably ok. + ++dmsgq_next_num_; + RTC_DCHECK_NE(0, dmsgq_next_num_); + } + WakeUpSocketServer(); +} + +int Thread::GetDelay() { + CritScope cs(&crit_); + + if (!msgq_.empty()) + return 0; + + if (!dmsgq_.empty()) { + int delay = TimeUntil(dmsgq_.top().msTrigger_); + if (delay < 0) + delay = 0; + return delay; + } + + return kForever; +} + +void Thread::ClearInternal(MessageHandler* phandler, + uint32_t id, + MessageList* removed) { + // Remove messages with phandler + + if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { + if (removed) { + removed->push_back(msgPeek_); + } else { + delete msgPeek_.pdata; + } + fPeekKeep_ = false; + } + + // Remove from ordered message queue + + for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { + if (it->Match(phandler, id)) { + if (removed) { + removed->push_back(*it); + } else { + delete it->pdata; + } + it = msgq_.erase(it); + } else { + ++it; + } + } + + // Remove from priority queue. Not directly iterable, so use this approach + + PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); + for (PriorityQueue::container_type::iterator it = new_end; + it != dmsgq_.container().end(); ++it) { + if (it->msg_.Match(phandler, id)) { + if (removed) { + removed->push_back(it->msg_); + } else { + delete it->msg_.pdata; + } + } else { + *new_end++ = *it; + } + } + dmsgq_.container().erase(new_end, dmsgq_.container().end()); + dmsgq_.reheap(); +} + +void Thread::Dispatch(Message* pmsg) { + TRACE_EVENT2("webrtc", "Thread::Dispatch", "src_file", + pmsg->posted_from.file_name(), "src_func", + pmsg->posted_from.function_name()); + int64_t start_time = TimeMillis(); + pmsg->phandler->OnMessage(pmsg); + int64_t end_time = TimeMillis(); + int64_t diff = TimeDiff(end_time, start_time); + if (diff >= kSlowDispatchLoggingThreshold) { + RTC_LOG(LS_INFO) << "Message took " << diff + << "ms to dispatch. Posted from: " + << pmsg->posted_from.ToString(); + } +} + bool Thread::IsCurrent() const { return ThreadManager::Instance()->CurrentThread() == this; } @@ -379,7 +829,7 @@ bool Thread::IsOwned() { } void Thread::Stop() { - MessageQueue::Quit(); + Thread::Quit(); Join(); } @@ -436,7 +886,7 @@ void Thread::Send(const Location& posted_from, crit_.Leave(); // Our Wait loop above may have consumed some WakeUp events for this - // MessageQueue, that weren't relevant to this Send. Losing these WakeUps can + // Thread, that weren't relevant to this Send. Losing these WakeUps can // cause problems for some SocketServers. // // Concrete example: @@ -510,7 +960,7 @@ void Thread::QueuedTaskHandler::OnMessage(Message* msg) { RTC_DCHECK(msg); auto* data = static_cast*>(msg->pdata); std::unique_ptr task = std::move(data->data()); - // MessageQueue expects handler to own Message::pdata when OnMessage is called + // Thread expects handler to own Message::pdata when OnMessage is called // Since MessageData is no longer needed, delete it. delete data; @@ -542,8 +992,7 @@ void Thread::Delete() { } bool Thread::IsProcessingMessagesForTesting() { - return (owned_ || IsCurrent()) && - MessageQueue::IsProcessingMessagesForTesting(); + return (owned_ || IsCurrent()) && !IsQuitting(); } void Thread::Clear(MessageHandler* phandler, @@ -642,7 +1091,7 @@ MessageHandler* Thread::GetPostTaskMessageHandler() { AutoThread::AutoThread() : Thread(SocketServer::CreateDefault(), /*do_init=*/false) { if (!ThreadManager::Instance()->CurrentThread()) { - // DoInit registers with MessageQueueManager. Do that only if we intend to + // DoInit registers with ThreadManager. Do that only if we intend to // be rtc::Thread::Current(), otherwise ProcessAllMessageQueuesInternal will // post a message to a queue that no running thread is serving. DoInit(); @@ -667,7 +1116,7 @@ AutoSocketServerThread::AutoSocketServerThread(SocketServer* ss) rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); rtc::ThreadManager::Instance()->SetCurrentThread(this); if (old_thread_) { - MessageQueueManager::Remove(old_thread_); + ThreadManager::Remove(old_thread_); } } @@ -679,7 +1128,7 @@ AutoSocketServerThread::~AutoSocketServerThread() { // cricket::Connection::Destroy. ProcessMessages(0); // Stop and destroy the thread before clearing it as the current thread. - // Sometimes there are messages left in the MessageQueue that will be + // Sometimes there are messages left in the Thread that will be // destroyed by DoDestroy, and sometimes the destructors of the message and/or // its contents rely on this thread still being set as the current thread. Stop(); @@ -687,7 +1136,7 @@ AutoSocketServerThread::~AutoSocketServerThread() { rtc::ThreadManager::Instance()->SetCurrentThread(nullptr); rtc::ThreadManager::Instance()->SetCurrentThread(old_thread_); if (old_thread_) { - MessageQueueManager::Add(old_thread_); + ThreadManager::Add(old_thread_); } } diff --git a/rtc_base/thread.h b/rtc_base/thread.h index fb40a54b80..b8af583e78 100644 --- a/rtc_base/thread.h +++ b/rtc_base/thread.h @@ -15,8 +15,10 @@ #include #include +#include #include #include +#include #if defined(WEBRTC_POSIX) #include @@ -25,13 +27,14 @@ #include "api/task_queue/queued_task.h" #include "api/task_queue/task_queue_base.h" #include "rtc_base/constructor_magic.h" +#include "rtc_base/critical_section.h" #include "rtc_base/location.h" #include "rtc_base/message_handler.h" -#include "rtc_base/message_queue.h" #include "rtc_base/platform_thread_types.h" #include "rtc_base/socket_server.h" #include "rtc_base/system/rtc_export.h" #include "rtc_base/thread_annotations.h" +#include "rtc_base/thread_message.h" #if defined(WEBRTC_WIN) #include "rtc_base/win32.h" @@ -73,6 +76,18 @@ class RTC_EXPORT ThreadManager { // Singleton, constructor and destructor are private. static ThreadManager* Instance(); + static void Add(Thread* message_queue); + static void Remove(Thread* message_queue); + static void Clear(MessageHandler* handler); + + // TODO(nisse): Delete alias, as soon as downstream code is updated. + static void ProcessAllMessageQueues() { ProcessAllMessageQueuesForTesting(); } + + // For testing purposes, for use with a simulated clock. + // Ensures that all message queues have processed delayed messages + // up until the current point in time. + static void ProcessAllMessageQueuesForTesting(); + Thread* CurrentThread(); void SetCurrentThread(Thread* thread); @@ -98,6 +113,20 @@ class RTC_EXPORT ThreadManager { ThreadManager(); ~ThreadManager(); + void AddInternal(Thread* message_queue); + void RemoveInternal(Thread* message_queue); + void ClearInternal(MessageHandler* handler); + void ProcessAllMessageQueuesInternal(); + + // This list contains all live Threads. + std::vector message_queues_ RTC_GUARDED_BY(crit_); + + // Methods that don't modify the list of message queues may be called in a + // re-entrant fashion. "processing_" keeps track of the depth of re-entrant + // calls. + CriticalSection crit_; + size_t processing_ RTC_GUARDED_BY(crit_) = 0; + #if defined(WEBRTC_POSIX) pthread_key_t key_; #endif @@ -121,11 +150,18 @@ struct _SendMessage { // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). -class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, - public webrtc::TaskQueueBase { +class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase { public: + static const int kForever = -1; + + // Create a new Thread and optionally assign it to the passed + // SocketServer. Subclasses that override Clear should pass false for + // init_queue and call DoInit() from their constructor to prevent races + // with the ThreadManager using the object while the vtable is still + // being created. explicit Thread(SocketServer* ss); explicit Thread(std::unique_ptr ss); + // Constructors meant for subclasses; they should call DoInit themselves and // pass false for |do_init|, so that DoInit is called only on the fully // instantiated class, which avoids a vptr data race. @@ -136,6 +172,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, // guarantee Stop() is explicitly called before the subclass is destroyed). // This is required to avoid a data race between the destructor modifying the // vtable, and the Thread::PreRun calling the virtual method Run(). + + // NOTE: SUBCLASSES OF Thread THAT OVERRIDE Clear MUST CALL + // DoDestroy() IN THEIR DESTRUCTORS! This is required to avoid a data race + // between the destructor modifying the vtable, and the ThreadManager + // calling Clear on the object from a different thread. ~Thread() override; static std::unique_ptr CreateWithSocketServer(); @@ -159,6 +200,78 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, const bool previous_state_; }; + SocketServer* socketserver(); + + // Note: The behavior of Thread has changed. When a thread is stopped, + // futher Posts and Sends will fail. However, any pending Sends and *ready* + // Posts (as opposed to unexpired delayed Posts) will be delivered before + // Get (or Peek) returns false. By guaranteeing delivery of those messages, + // we eliminate the race condition when an MessageHandler and Thread + // may be destroyed independently of each other. + virtual void Quit(); + virtual bool IsQuitting(); + virtual void Restart(); + // Not all message queues actually process messages (such as SignalThread). + // In those cases, it's important to know, before posting, that it won't be + // Processed. Normally, this would be true until IsQuitting() is true. + virtual bool IsProcessingMessagesForTesting(); + + // Get() will process I/O until: + // 1) A message is available (returns true) + // 2) cmsWait seconds have elapsed (returns false) + // 3) Stop() is called (returns false) + virtual bool Get(Message* pmsg, + int cmsWait = kForever, + bool process_io = true); + virtual bool Peek(Message* pmsg, int cmsWait = 0); + virtual void Post(const Location& posted_from, + MessageHandler* phandler, + uint32_t id = 0, + MessageData* pdata = nullptr, + bool time_sensitive = false); + virtual void PostDelayed(const Location& posted_from, + int cmsDelay, + MessageHandler* phandler, + uint32_t id = 0, + MessageData* pdata = nullptr); + virtual void PostAt(const Location& posted_from, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id = 0, + MessageData* pdata = nullptr); + // TODO(honghaiz): Remove this when all the dependencies are removed. + virtual void PostAt(const Location& posted_from, + uint32_t tstamp, + MessageHandler* phandler, + uint32_t id = 0, + MessageData* pdata = nullptr); + virtual void Clear(MessageHandler* phandler, + uint32_t id = MQID_ANY, + MessageList* removed = nullptr); + virtual void Dispatch(Message* pmsg); + virtual void ReceiveSends(); + + // Amount of time until the next message can be retrieved + virtual int GetDelay(); + + bool empty() const { return size() == 0u; } + size_t size() const { + CritScope cs(&crit_); // msgq_.size() is not thread safe. + return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); + } + + // Internally posts a message which causes the doomed object to be deleted + template + void Dispose(T* doomed) { + if (doomed) { + Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData(doomed)); + } + } + + // When this signal is sent out, any references to this queue should + // no longer be used. + sigslot::signal0<> SignalQueueDestroyed; + bool IsCurrent() const; // Sleeps the calling thread for the specified number of milliseconds, during @@ -176,7 +289,7 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, // Tells the thread to stop and waits until it is joined. // Never call Stop on the current thread. Instead use the inherited Quit - // function which will exit the base MessageQueue without terminating the + // function which will exit the base Thread without terminating the // underlying OS thread. virtual void Stop(); @@ -272,13 +385,6 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, uint32_t milliseconds) override; void Delete() override; - // From MessageQueue - bool IsProcessingMessagesForTesting() override; - void Clear(MessageHandler* phandler, - uint32_t id = MQID_ANY, - MessageList* removed = nullptr) override; - void ReceiveSends() override; - // ProcessMessages will process I/O and dispatch messages until: // 1) cms milliseconds have elapsed (returns true) // 2) Stop() is called (returns false) @@ -321,6 +427,35 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, #endif protected: + class PriorityQueue : public std::priority_queue { + public: + container_type& container() { return c; } + void reheap() { make_heap(c.begin(), c.end(), comp); } + }; + + void DoDelayPost(const Location& posted_from, + int64_t cmsDelay, + int64_t tstamp, + MessageHandler* phandler, + uint32_t id, + MessageData* pdata); + + // Perform initialization, subclasses must call this from their constructor + // if false was passed as init_queue to the Thread constructor. + void DoInit(); + + // Does not take any lock. Must be called either while holding crit_, or by + // the destructor (by definition, the latter has exclusive access). + void ClearInternal(MessageHandler* phandler, + uint32_t id, + MessageList* removed) RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); + + // Perform cleanup; subclasses must call this from the destructor, + // and are not expected to actually hold the lock. + void DoDestroy() RTC_EXCLUSIVE_LOCKS_REQUIRED(&crit_); + + void WakeUpSocketServer(); + // Same as WrapCurrent except that it never fails as it does not try to // acquire the synchronization access of the thread. The caller should never // call Stop() or Join() on this thread. @@ -333,6 +468,8 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, friend class ScopedDisallowBlockingCalls; + CriticalSection* CritForTest() { return &crit_; } + private: class QueuedTaskHandler final : public MessageHandler { public: @@ -377,6 +514,22 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public MessageQueue, // MessageLikeTask payload data. static MessageHandler* GetPostTaskMessageHandler(); + bool fPeekKeep_; + Message msgPeek_; + MessageList msgq_ RTC_GUARDED_BY(crit_); + PriorityQueue dmsgq_ RTC_GUARDED_BY(crit_); + uint32_t dmsgq_next_num_ RTC_GUARDED_BY(crit_); + CriticalSection crit_; + bool fInitialized_; + bool fDestroyed_; + + volatile int stop_; + + // The SocketServer might not be owned by Thread. + SocketServer* const ss_; + // Used if SocketServer ownership lies with |this|. + std::unique_ptr own_ss_; + std::list<_SendMessage> sendlist_; std::string name_; @@ -437,6 +590,10 @@ class AutoSocketServerThread : public Thread { RTC_DISALLOW_COPY_AND_ASSIGN(AutoSocketServerThread); }; +// TODO(srte): Remove these when all dependencies has been updated. +using MessageQueue = Thread; +using MessageQueueManager = ThreadManager; + } // namespace rtc #endif // RTC_BASE_THREAD_H_ diff --git a/rtc_base/thread_message.h b/rtc_base/thread_message.h new file mode 100644 index 0000000000..1f6af1a940 --- /dev/null +++ b/rtc_base/thread_message.h @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2020 The WebRTC project authors. All Rights Reserved. + * + * Use of this source code is governed by a BSD-style license + * that can be found in the LICENSE file in the root of the source + * tree. An additional intellectual property rights grant can be found + * in the file PATENTS. All contributing project authors may + * be found in the AUTHORS file in the root of the source tree. + */ +#ifndef RTC_BASE_THREAD_MESSAGE_H_ +#define RTC_BASE_THREAD_MESSAGE_H_ + +#include +#include +#include + +#include "api/scoped_refptr.h" +#include "rtc_base/location.h" +#include "rtc_base/message_handler.h" + +namespace rtc { + +// Derive from this for specialized data +// App manages lifetime, except when messages are purged + +class MessageData { + public: + MessageData() {} + virtual ~MessageData() {} +}; + +template +class TypedMessageData : public MessageData { + public: + explicit TypedMessageData(const T& data) : data_(data) {} + const T& data() const { return data_; } + T& data() { return data_; } + + private: + T data_; +}; + +// Like TypedMessageData, but for pointers that require a delete. +template +class ScopedMessageData : public MessageData { + public: + explicit ScopedMessageData(std::unique_ptr data) + : data_(std::move(data)) {} + // Deprecated. + // TODO(deadbeef): Remove this once downstream applications stop using it. + explicit ScopedMessageData(T* data) : data_(data) {} + // Deprecated. + // TODO(deadbeef): Returning a reference to a unique ptr? Why. Get rid of + // this once downstream applications stop using it, then rename inner_data to + // just data. + const std::unique_ptr& data() const { return data_; } + std::unique_ptr& data() { return data_; } + + const T& inner_data() const { return *data_; } + T& inner_data() { return *data_; } + + private: + std::unique_ptr data_; +}; + +// Like ScopedMessageData, but for reference counted pointers. +template +class ScopedRefMessageData : public MessageData { + public: + explicit ScopedRefMessageData(T* data) : data_(data) {} + const scoped_refptr& data() const { return data_; } + scoped_refptr& data() { return data_; } + + private: + scoped_refptr data_; +}; + +template +inline MessageData* WrapMessageData(const T& data) { + return new TypedMessageData(data); +} + +template +inline const T& UseMessageData(MessageData* data) { + return static_cast*>(data)->data(); +} + +template +class DisposeData : public MessageData { + public: + explicit DisposeData(T* data) : data_(data) {} + virtual ~DisposeData() { delete data_; } + + private: + T* data_; +}; + +const uint32_t MQID_ANY = static_cast(-1); +const uint32_t MQID_DISPOSE = static_cast(-2); + +// No destructor + +struct Message { + Message() + : phandler(nullptr), message_id(0), pdata(nullptr), ts_sensitive(0) {} + inline bool Match(MessageHandler* handler, uint32_t id) const { + return (handler == nullptr || handler == phandler) && + (id == MQID_ANY || id == message_id); + } + Location posted_from; + MessageHandler* phandler; + uint32_t message_id; + MessageData* pdata; + int64_t ts_sensitive; +}; + +typedef std::list MessageList; + +// DelayedMessage goes into a priority queue, sorted by trigger time. Messages +// with the same trigger time are processed in num_ (FIFO) order. + +class DelayedMessage { + public: + DelayedMessage(int64_t delay, + int64_t trigger, + uint32_t num, + const Message& msg) + : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) {} + + bool operator<(const DelayedMessage& dmsg) const { + return (dmsg.msTrigger_ < msTrigger_) || + ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); + } + + int64_t cmsDelay_; // for debugging + int64_t msTrigger_; + uint32_t num_; + Message msg_; +}; +} // namespace rtc +#endif // RTC_BASE_THREAD_MESSAGE_H_ diff --git a/sdk/android/src/jni/android_media_decoder.cc b/sdk/android/src/jni/android_media_decoder.cc index 8e2e25fa86..f61db2ad3c 100644 --- a/sdk/android/src/jni/android_media_decoder.cc +++ b/sdk/android/src/jni/android_media_decoder.cc @@ -264,7 +264,7 @@ int32_t MediaCodecVideoDecoder::ResetDecodeOnCodecThread() { << ". Frames decoded: " << frames_decoded_; inited_ = false; - rtc::MessageQueueManager::Clear(this); + rtc::ThreadManager::Clear(this); ResetVariables(); Java_MediaCodecVideoDecoder_reset(jni, j_media_codec_video_decoder_, @@ -300,7 +300,7 @@ int32_t MediaCodecVideoDecoder::ReleaseOnCodecThread() { input_buffers_.clear(); Java_MediaCodecVideoDecoder_release(jni, j_media_codec_video_decoder_); inited_ = false; - rtc::MessageQueueManager::Clear(this); + rtc::ThreadManager::Clear(this); if (CheckException(jni)) { ALOGE << "Decoder release exception"; return WEBRTC_VIDEO_CODEC_ERROR; diff --git a/sdk/objc/unittests/RTCAudioDevice_xctest.mm b/sdk/objc/unittests/RTCAudioDevice_xctest.mm index 10d3994743..a3db613dfe 100644 --- a/sdk/objc/unittests/RTCAudioDevice_xctest.mm +++ b/sdk/objc/unittests/RTCAudioDevice_xctest.mm @@ -96,7 +96,7 @@ [self.audioSession notifyDidBeginInterruption]; // Wait for notification to propagate. - rtc::MessageQueueManager::ProcessAllMessageQueuesForTesting(); + rtc::ThreadManager::ProcessAllMessageQueuesForTesting(); XCTAssertTrue(_audio_device->IsInterrupted()); // Force it for testing. @@ -104,7 +104,7 @@ [self.audioSession notifyDidEndInterruptionWithShouldResumeSession:YES]; // Wait for notification to propagate. - rtc::MessageQueueManager::ProcessAllMessageQueuesForTesting(); + rtc::ThreadManager::ProcessAllMessageQueuesForTesting(); XCTAssertTrue(_audio_device->IsInterrupted()); _audio_device->Init();