Fixing potential AsyncInvoker deadlock that occurs for "reentrant" invocations.
The deadlock occurs if the AsyncInvoker is destroyed on thread A while a task on thread B is running, which AsyncInvokes a task back on thread A. This was causing pending_invocations_ to end up negative, because an AsyncClosure that's never added to a thread's message queue (due to the "destroying_" flag) caused the count to be decremented but not incremented. BUG=None Review-Url: https://codereview.webrtc.org/2885143006 Cr-Commit-Position: refs/heads/master@{#18225}
This commit is contained in:
@ -27,7 +27,7 @@ class AsyncInvoker;
|
||||
// on the calling thread if necessary.
|
||||
class AsyncClosure {
|
||||
public:
|
||||
explicit AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {}
|
||||
explicit AsyncClosure(AsyncInvoker* invoker);
|
||||
virtual ~AsyncClosure();
|
||||
// Runs the asynchronous task, and triggers a callback to the calling
|
||||
// thread if needed. Should be called from the target thread.
|
||||
|
||||
@ -19,7 +19,7 @@ namespace rtc {
|
||||
AsyncInvoker::AsyncInvoker() : invocation_complete_(false, false) {}
|
||||
|
||||
AsyncInvoker::~AsyncInvoker() {
|
||||
destroying_ = true;
|
||||
AtomicOps::Increment(&destroying_);
|
||||
// Messages for this need to be cleared *before* our destructor is complete.
|
||||
MessageQueueManager::Clear(this);
|
||||
// And we need to wait for any invocations that are still in progress on
|
||||
@ -44,7 +44,8 @@ void AsyncInvoker::OnMessage(Message* msg) {
|
||||
}
|
||||
|
||||
void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
|
||||
if (destroying_) return;
|
||||
if (AtomicOps::AcquireLoad(&destroying_))
|
||||
return;
|
||||
|
||||
// Run this on |thread| to reduce the number of context switches.
|
||||
if (Thread::Current() != thread) {
|
||||
@ -65,11 +66,10 @@ void AsyncInvoker::DoInvoke(const Location& posted_from,
|
||||
Thread* thread,
|
||||
std::unique_ptr<AsyncClosure> closure,
|
||||
uint32_t id) {
|
||||
if (destroying_) {
|
||||
if (AtomicOps::AcquireLoad(&destroying_)) {
|
||||
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
||||
return;
|
||||
}
|
||||
AtomicOps::Increment(&pending_invocations_);
|
||||
thread->Post(posted_from, this, id,
|
||||
new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
||||
}
|
||||
@ -79,11 +79,10 @@ void AsyncInvoker::DoInvokeDelayed(const Location& posted_from,
|
||||
std::unique_ptr<AsyncClosure> closure,
|
||||
uint32_t delay_ms,
|
||||
uint32_t id) {
|
||||
if (destroying_) {
|
||||
if (AtomicOps::AcquireLoad(&destroying_)) {
|
||||
LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
|
||||
return;
|
||||
}
|
||||
AtomicOps::Increment(&pending_invocations_);
|
||||
thread->PostDelayed(posted_from, delay_ms, this, id,
|
||||
new ScopedMessageData<AsyncClosure>(std::move(closure)));
|
||||
}
|
||||
@ -111,6 +110,10 @@ void GuardedAsyncInvoker::ThreadDestroyed() {
|
||||
thread_ = nullptr;
|
||||
}
|
||||
|
||||
AsyncClosure::AsyncClosure(AsyncInvoker* invoker) : invoker_(invoker) {
|
||||
AtomicOps::Increment(&invoker_->pending_invocations_);
|
||||
}
|
||||
|
||||
AsyncClosure::~AsyncClosure() {
|
||||
AtomicOps::Decrement(&invoker_->pending_invocations_);
|
||||
invoker_->invocation_complete_.Set();
|
||||
|
||||
@ -120,7 +120,7 @@ class AsyncInvoker : public MessageHandler {
|
||||
uint32_t id);
|
||||
volatile int pending_invocations_ = 0;
|
||||
Event invocation_complete_;
|
||||
bool destroying_ = false;
|
||||
int destroying_ = 0;
|
||||
friend class AsyncClosure;
|
||||
|
||||
RTC_DISALLOW_COPY_AND_ASSIGN(AsyncInvoker);
|
||||
|
||||
@ -104,7 +104,7 @@ class MessageClient : public MessageHandler, public TestGenerator {
|
||||
Socket* socket_;
|
||||
};
|
||||
|
||||
class CustomThread : public rtc::Thread {
|
||||
class CustomThread : public Thread {
|
||||
public:
|
||||
CustomThread() {}
|
||||
virtual ~CustomThread() { Stop(); }
|
||||
@ -150,7 +150,7 @@ class SignalWhenDestroyedThread : public Thread {
|
||||
|
||||
// Using std::atomic<bool> or std::atomic_flag in C++11 is probably
|
||||
// the right thing to do, but those features are not yet allowed. Or
|
||||
// rtc::AtomicInt, if/when that is added. Since the use isn't
|
||||
// AtomicInt, if/when that is added. Since the use isn't
|
||||
// performance critical, use a plain critical section for the time
|
||||
// being.
|
||||
|
||||
@ -451,27 +451,23 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
|
||||
// executing, and then to wait for it to finish, ensuring the "EXPECT_FALSE"
|
||||
// is run.
|
||||
Event functor_started(false, false);
|
||||
Event functor_continue(false, false);
|
||||
Event functor_finished(false, false);
|
||||
|
||||
Thread thread;
|
||||
thread.Start();
|
||||
volatile bool invoker_destroyed = false;
|
||||
{
|
||||
auto functor = [&functor_started, &functor_finished, &invoker_destroyed] {
|
||||
functor_started.Set();
|
||||
Thread::Current()->SleepMs(kWaitTimeout);
|
||||
EXPECT_FALSE(invoker_destroyed);
|
||||
functor_finished.Set();
|
||||
};
|
||||
AsyncInvoker invoker;
|
||||
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread,
|
||||
[&functor_started, &functor_continue,
|
||||
&functor_finished, &invoker_destroyed] {
|
||||
functor_started.Set();
|
||||
functor_continue.Wait(Event::kForever);
|
||||
rtc::Thread::Current()->SleepMs(kWaitTimeout);
|
||||
EXPECT_FALSE(invoker_destroyed);
|
||||
functor_finished.Set();
|
||||
});
|
||||
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
|
||||
functor_started.Wait(Event::kForever);
|
||||
|
||||
// Allow the functor to continue and immediately destroy the invoker.
|
||||
functor_continue.Set();
|
||||
// Destroy the invoker while the functor is still executing (doing
|
||||
// SleepMs).
|
||||
}
|
||||
|
||||
// If the destructor DIDN'T wait for the functor to finish executing, it will
|
||||
@ -481,6 +477,35 @@ TEST_F(AsyncInvokeTest, KillInvokerDuringExecute) {
|
||||
functor_finished.Wait(Event::kForever);
|
||||
}
|
||||
|
||||
// Variant of the above test where the async-invoked task calls AsyncInvoke
|
||||
// again, for the thread on which the AsyncInvoker is currently being
|
||||
// destroyed. This shouldn't deadlock or crash; this second invocation should
|
||||
// just be ignored.
|
||||
TEST_F(AsyncInvokeTest, KillInvokerDuringExecuteWithReentrantInvoke) {
|
||||
Event functor_started(false, false);
|
||||
bool reentrant_functor_run = false;
|
||||
|
||||
Thread* main = Thread::Current();
|
||||
Thread thread;
|
||||
thread.Start();
|
||||
{
|
||||
AsyncInvoker invoker;
|
||||
auto reentrant_functor = [&reentrant_functor_run] {
|
||||
reentrant_functor_run = true;
|
||||
};
|
||||
auto functor = [&functor_started, &invoker, main, reentrant_functor] {
|
||||
functor_started.Set();
|
||||
Thread::Current()->SleepMs(kWaitTimeout);
|
||||
invoker.AsyncInvoke<void>(RTC_FROM_HERE, main, reentrant_functor);
|
||||
};
|
||||
// This queues a task on |thread| to sleep for |kWaitTimeout| then queue a
|
||||
// task on |main|. But this second queued task should never run.
|
||||
invoker.AsyncInvoke<void>(RTC_FROM_HERE, &thread, functor);
|
||||
functor_started.Wait(Event::kForever);
|
||||
}
|
||||
EXPECT_FALSE(reentrant_functor_run);
|
||||
}
|
||||
|
||||
TEST_F(AsyncInvokeTest, Flush) {
|
||||
AsyncInvoker invoker;
|
||||
AtomicBool flag1;
|
||||
|
||||
Reference in New Issue
Block a user