Protect MessageQueue stop field with a critical section to avoid data races.
Review-Url: https://codereview.webrtc.org/2023193002 Cr-Commit-Position: refs/heads/master@{#13430}
This commit is contained in:
@ -150,8 +150,12 @@ void MessageQueueManager::ProcessAllMessageQueuesInternal() {
|
||||
//------------------------------------------------------------------
|
||||
// MessageQueue
|
||||
MessageQueue::MessageQueue(SocketServer* ss, bool init_queue)
|
||||
: fStop_(false), fPeekKeep_(false),
|
||||
dmsgq_next_num_(0), fInitialized_(false), fDestroyed_(false), ss_(ss) {
|
||||
: fPeekKeep_(false),
|
||||
dmsgq_next_num_(0),
|
||||
fInitialized_(false),
|
||||
fDestroyed_(false),
|
||||
stop_(0),
|
||||
ss_(ss) {
|
||||
RTC_DCHECK(ss);
|
||||
// Currently, MessageQueue holds a socket server, and is the base class for
|
||||
// Thread. It seems like it makes more sense for Thread to hold the socket
|
||||
@ -223,16 +227,16 @@ void MessageQueue::WakeUpSocketServer() {
|
||||
}
|
||||
|
||||
void MessageQueue::Quit() {
|
||||
fStop_ = true;
|
||||
AtomicOps::ReleaseStore(&stop_, 1);
|
||||
WakeUpSocketServer();
|
||||
}
|
||||
|
||||
bool MessageQueue::IsQuitting() {
|
||||
return fStop_;
|
||||
return AtomicOps::AcquireLoad(&stop_) != 0;
|
||||
}
|
||||
|
||||
void MessageQueue::Restart() {
|
||||
fStop_ = false;
|
||||
AtomicOps::ReleaseStore(&stop_, 0);
|
||||
}
|
||||
|
||||
bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
|
||||
@ -316,7 +320,7 @@ bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (fStop_)
|
||||
if (IsQuitting())
|
||||
break;
|
||||
|
||||
// Which is shorter, the delay wait or the asked wait?
|
||||
@ -357,7 +361,7 @@ void MessageQueue::Post(const Location& posted_from,
|
||||
uint32_t id,
|
||||
MessageData* pdata,
|
||||
bool time_sensitive) {
|
||||
if (fStop_)
|
||||
if (IsQuitting())
|
||||
return;
|
||||
|
||||
// Keep thread safe
|
||||
@ -413,7 +417,7 @@ void MessageQueue::DoDelayPost(const Location& posted_from,
|
||||
MessageHandler* phandler,
|
||||
uint32_t id,
|
||||
MessageData* pdata) {
|
||||
if (fStop_) {
|
||||
if (IsQuitting()) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@ -288,7 +288,6 @@ class MessageQueue {
|
||||
|
||||
void WakeUpSocketServer();
|
||||
|
||||
bool fStop_;
|
||||
bool fPeekKeep_;
|
||||
Message msgPeek_;
|
||||
MessageList msgq_ GUARDED_BY(crit_);
|
||||
@ -299,6 +298,8 @@ class MessageQueue {
|
||||
bool fDestroyed_;
|
||||
|
||||
private:
|
||||
volatile int stop_;
|
||||
|
||||
// The SocketServer might not be owned by MessageQueue.
|
||||
SocketServer* ss_ GUARDED_BY(ss_lock_);
|
||||
// Used if SocketServer ownership lies with |this|.
|
||||
|
||||
@ -218,7 +218,7 @@ bool Thread::Start(Runnable* runnable) {
|
||||
ASSERT(!running());
|
||||
if (running()) return false;
|
||||
|
||||
Restart(); // reset fStop_ if the thread is being restarted
|
||||
Restart(); // reset IsQuitting() if the thread is being restarted
|
||||
|
||||
// Make sure that ThreadManager is created on the main thread before
|
||||
// we start a new thread.
|
||||
@ -346,7 +346,7 @@ void Thread::Send(const Location& posted_from,
|
||||
MessageHandler* phandler,
|
||||
uint32_t id,
|
||||
MessageData* pdata) {
|
||||
if (fStop_)
|
||||
if (IsQuitting())
|
||||
return;
|
||||
|
||||
// Sent messages are sent to the MessageHandler directly, in the context
|
||||
|
||||
@ -24,7 +24,6 @@ char kTSanDefaultSuppressions[] =
|
||||
// WebRTC specific suppressions.
|
||||
|
||||
// Split up suppressions covered previously by thread.cc and messagequeue.cc.
|
||||
"race:rtc::MessageQueue::Quit\n"
|
||||
"race:vp8cx_remove_encoder_threads\n"
|
||||
"race:third_party/libvpx/source/libvpx/vp9/common/vp9_scan.h\n"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user