in rtc::Thread remove special handling of the Dispose message

rtc::Thread::Dispose is only used in test code,
but complicates the main thread loop.

Bug: webrtc:8324
Change-Id: I2dccdadcdc932b9992958d1e70fb93d1879b7618
Reviewed-on: https://webrtc-review.googlesource.com/c/src/+/272821
Reviewed-by: Tomas Gunnarsson <tommi@webrtc.org>
Commit-Queue: Danil Chapovalov <danilchap@webrtc.org>
Cr-Commit-Position: refs/heads/main@{#37894}
This commit is contained in:
Danil Chapovalov
2022-08-24 18:35:45 +02:00
committed by WebRTC LUCI CQ
parent cfa44b8bca
commit 0bd166530d
6 changed files with 42 additions and 55 deletions

View File

@ -940,6 +940,7 @@ rtc_library("threading") {
"//third_party/abseil-cpp/absl/base:core_headers", "//third_party/abseil-cpp/absl/base:core_headers",
"//third_party/abseil-cpp/absl/cleanup", "//third_party/abseil-cpp/absl/cleanup",
"//third_party/abseil-cpp/absl/functional:any_invocable", "//third_party/abseil-cpp/absl/functional:any_invocable",
"//third_party/abseil-cpp/absl/memory",
"//third_party/abseil-cpp/absl/strings", "//third_party/abseil-cpp/absl/strings",
] ]
deps = [ deps = [

View File

@ -18,6 +18,7 @@
#include <memory> #include <memory>
#include "absl/algorithm/container.h" #include "absl/algorithm/container.h"
#include "absl/memory/memory.h"
#include "rtc_base/async_packet_socket.h" #include "rtc_base/async_packet_socket.h"
#include "rtc_base/async_tcp_socket.h" #include "rtc_base/async_tcp_socket.h"
#include "rtc_base/socket.h" #include "rtc_base/socket.h"
@ -61,7 +62,9 @@ class TestEchoServer : public sigslot::has_slots<> {
void OnClose(AsyncPacketSocket* socket, int err) { void OnClose(AsyncPacketSocket* socket, int err) {
ClientList::iterator it = absl::c_find(client_sockets_, socket); ClientList::iterator it = absl::c_find(client_sockets_, socket);
client_sockets_.erase(it); client_sockets_.erase(it);
Thread::Current()->Dispose(socket); // `OnClose` is triggered by socket Close callback, deleting `socket` while
// processing that callback might be unsafe.
Thread::Current()->PostTask([socket = absl::WrapUnique(socket)] {});
} }
typedef std::list<AsyncTCPSocket*> ClientList; typedef std::list<AsyncTCPSocket*> ClientList;

View File

@ -72,6 +72,8 @@ class ScopedAutoReleasePool {
namespace rtc { namespace rtc {
namespace { namespace {
using ::webrtc::TimeDelta;
struct AnyInvocableMessage final : public MessageData { struct AnyInvocableMessage final : public MessageData {
explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task) explicit AnyInvocableMessage(absl::AnyInvocable<void() &&> task)
: task(std::move(task)) {} : task(std::move(task)) {}
@ -216,19 +218,6 @@ void ThreadManager::ProcessAllMessageQueuesInternal() {
// that came before it were also dispatched. // that came before it were also dispatched.
std::atomic<int> queues_not_done(0); std::atomic<int> queues_not_done(0);
// This class is used so that whether the posted message is processed, or the
// message queue is simply cleared, queues_not_done gets decremented.
class ScopedIncrement : public MessageData {
public:
ScopedIncrement(std::atomic<int>* value) : value_(value) {
value_->fetch_add(1);
}
~ScopedIncrement() override { value_->fetch_sub(1); }
private:
std::atomic<int>* value_;
};
{ {
MarkProcessingCritScope cs(&crit_, &processing_); MarkProcessingCritScope cs(&crit_, &processing_);
for (Thread* queue : message_queues_) { for (Thread* queue : message_queues_) {
@ -238,8 +227,13 @@ void ThreadManager::ProcessAllMessageQueuesInternal() {
// or ignored. // or ignored.
continue; continue;
} }
queue->PostDelayed(RTC_FROM_HERE, 0, nullptr, MQID_DISPOSE, queues_not_done.fetch_add(1);
new ScopedIncrement(&queues_not_done)); // Whether the task is processed, or the thread is simply cleared,
// queues_not_done gets decremented.
absl::Cleanup sub = [&queues_not_done] { queues_not_done.fetch_sub(1); };
// Post delayed task instead of regular task to wait for all delayed tasks
// that are ready for processing.
queue->PostDelayedTask([sub = std::move(sub)] {}, TimeDelta::Zero());
} }
} }
@ -459,44 +453,27 @@ bool Thread::Get(Message* pmsg, int cmsWait, bool process_io) {
while (true) { while (true) {
// Check for posted events // Check for posted events
int64_t cmsDelayNext = kForever; int64_t cmsDelayNext = kForever;
bool first_pass = true; {
while (true) {
// All queue operations need to be locked, but nothing else in this loop // All queue operations need to be locked, but nothing else in this loop
// (specifically handling disposed message) can happen inside the crit. // can happen inside the crit.
// Otherwise, disposed MessageHandlers will cause deadlocks. CritScope cs(&crit_);
{ // Check for delayed messages that have been triggered and calculate the
CritScope cs(&crit_); // next trigger time.
// On the first pass, check for delayed messages that have been while (!delayed_messages_.empty()) {
// triggered and calculate the next trigger time. if (msCurrent < delayed_messages_.top().run_time_ms_) {
if (first_pass) { cmsDelayNext =
first_pass = false; TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
while (!delayed_messages_.empty()) {
if (msCurrent < delayed_messages_.top().run_time_ms_) {
cmsDelayNext =
TimeDiff(delayed_messages_.top().run_time_ms_, msCurrent);
break;
}
messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
}
}
// Pull a message off the message queue, if available.
if (messages_.empty()) {
break; break;
} else {
*pmsg = messages_.front();
messages_.pop_front();
} }
} // crit_ is released here. messages_.push_back(delayed_messages_.top().msg_);
delayed_messages_.pop();
// If this was a dispose message, delete it and skip it. }
if (MQID_DISPOSE == pmsg->message_id) { // Pull a message off the message queue, if available.
RTC_DCHECK(nullptr == pmsg->phandler); if (!messages_.empty()) {
delete pmsg->pdata; *pmsg = messages_.front();
*pmsg = Message(); messages_.pop_front();
continue; return true;
} }
return true;
} }
if (IsQuitting()) if (IsQuitting())

View File

@ -29,6 +29,7 @@
#endif #endif
#include "absl/base/attributes.h" #include "absl/base/attributes.h"
#include "absl/functional/any_invocable.h" #include "absl/functional/any_invocable.h"
#include "absl/memory/memory.h"
#include "api/function_view.h" #include "api/function_view.h"
#include "api/task_queue/task_queue_base.h" #include "api/task_queue/task_queue_base.h"
#include "api/units/time_delta.h" #include "api/units/time_delta.h"
@ -298,11 +299,11 @@ class RTC_LOCKABLE RTC_EXPORT Thread : public webrtc::TaskQueueBase {
} }
// Internally posts a message which causes the doomed object to be deleted // Internally posts a message which causes the doomed object to be deleted
// TODO(bugs.webrtc.org/8324): Delete when unused by dependencies.
template <class T> template <class T>
void Dispose(T* doomed) { void Dispose(T* doomed) {
if (doomed) { RTC_DCHECK(doomed);
Post(RTC_FROM_HERE, nullptr, MQID_DISPOSE, new DisposeData<T>(doomed)); PostTask([dommed = absl::WrapUnique(doomed)] {});
}
} }
bool IsCurrent() const; bool IsCurrent() const;

View File

@ -89,6 +89,7 @@ class DisposeData : public MessageData {
}; };
const uint32_t MQID_ANY = static_cast<uint32_t>(-1); const uint32_t MQID_ANY = static_cast<uint32_t>(-1);
// TODO(bugs.webrtc.org/8324): Delete when unused by dependencies.
const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2); const uint32_t MQID_DISPOSE = static_cast<uint32_t>(-2);
// No destructor // No destructor

View File

@ -558,7 +558,11 @@ TEST(ThreadTest, ThreeThreadsInvoke) {
class ThreadQueueTest : public ::testing::Test, public Thread { class ThreadQueueTest : public ::testing::Test, public Thread {
public: public:
ThreadQueueTest() : Thread(CreateDefaultSocketServer(), true) {} ThreadQueueTest() : Thread(CreateDefaultSocketServer(), true) {
RTC_DCHECK(Thread::Current() == nullptr);
ThreadManager::Instance()->SetCurrentThread(this);
}
~ThreadQueueTest() { ThreadManager::Instance()->SetCurrentThread(nullptr); }
bool IsLocked_Worker() { bool IsLocked_Worker() {
if (!CritForTest()->TryEnter()) { if (!CritForTest()->TryEnter()) {
return true; return true;
@ -644,7 +648,7 @@ class DeletedMessageHandler : public MessageHandlerAutoCleanup {
bool* deleted_; bool* deleted_;
}; };
TEST_F(ThreadQueueTest, DiposeHandlerWithPostedMessagePending) { TEST_F(ThreadQueueTest, DisposeHandlerWithPostedMessagePending) {
bool deleted = false; bool deleted = false;
DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted); DeletedMessageHandler* handler = new DeletedMessageHandler(&deleted);
// First, post a dispose. // First, post a dispose.