From b686205b978fb050bd2e90ec002dcd3d64d69402 Mon Sep 17 00:00:00 2001 From: luozenglin <37725793+luozenglin@users.noreply.github.com> Date: Wed, 12 May 2021 10:59:53 +0800 Subject: [PATCH] [Optimize] Reduce lock conflicts in ThreadResourceMgr of be (#5772) Removed some useless code that caused lock conflicts in ThreadResourceMgr of be. --- be/src/runtime/thread_resource_mgr.cpp | 25 ++------------ be/src/runtime/thread_resource_mgr.h | 36 +------------------- be/test/runtime/thread_resource_mgr_test.cpp | 27 --------------- 3 files changed, 3 insertions(+), 85 deletions(-) diff --git a/be/src/runtime/thread_resource_mgr.cpp b/be/src/runtime/thread_resource_mgr.cpp index de15d4c8aa..7575a4d87f 100644 --- a/be/src/runtime/thread_resource_mgr.cpp +++ b/be/src/runtime/thread_resource_mgr.cpp @@ -57,7 +57,6 @@ ThreadResourceMgr::ResourcePool::ResourcePool(ThreadResourceMgr* parent) : _pare void ThreadResourceMgr::ResourcePool::reset() { _num_threads = 0; _num_reserved_optional_threads = 0; - _thread_available_fn = NULL; _max_quota = INT_MAX; } @@ -83,7 +82,7 @@ ThreadResourceMgr::ResourcePool* ThreadResourceMgr::register_pool() { pool->reset(); // Added a new pool, update the quotas for each pool. - update_pool_quotas(pool); + update_pool_quotas(); return pool; } @@ -98,32 +97,12 @@ void ThreadResourceMgr::unregister_pool(ResourcePool* pool) { } } -void ThreadResourceMgr::ResourcePool::set_thread_available_cb(thread_available_cb fn) { - std::unique_lock l(_lock); - DCHECK(_thread_available_fn == NULL || fn == NULL); - _thread_available_fn = fn; -} - -void ThreadResourceMgr::update_pool_quotas(ResourcePool* new_pool) { +void ThreadResourceMgr::update_pool_quotas() { if (_pools.empty()) { return; } _per_pool_quota = ceil(static_cast(_system_threads_quota) / _pools.size()); - - for (Pools::iterator it = _pools.begin(); it != _pools.end(); ++it) { - ResourcePool* pool = *it; - - if (pool == new_pool) { - continue; - } - - std::unique_lock l(pool->_lock); - - if (pool->num_available_threads() > 0 && pool->_thread_available_fn != NULL) { - pool->_thread_available_fn(pool); - } - } } } // namespace doris diff --git a/be/src/runtime/thread_resource_mgr.h b/be/src/runtime/thread_resource_mgr.h index ca8840e836..02c3584f31 100644 --- a/be/src/runtime/thread_resource_mgr.h +++ b/be/src/runtime/thread_resource_mgr.h @@ -117,14 +117,6 @@ public: // Must not be called from from thread_available_cb. void release_thread_token(bool required); - // Add a callback to be notified when a thread is available. - // 'arg' is opaque and passed directly to the callback. - // The previous callback is no longer notified. - // TODO: rethink this. How we do coordinate when we have multiple places in - // the execution that all need threads (e.g. do we use that thread for - // the scanner or for the join). - void set_thread_available_cb(thread_available_cb fn); - // Returns the number of threads that are from acquire_thread_token. int num_required_threads() const { return _num_threads & 0xFFFFFFFF; } @@ -170,13 +162,6 @@ public: // swap operations. The number of required threads is the lower // 32 bits and the number of optional threads is the upper 32 bits. int64_t _num_threads; - - // Lock for the fields below. This lock is taken when the callback - // function is called. - // TODO: reconsider this. - std::mutex _lock; - - thread_available_cb _thread_available_fn; }; // Create a thread mgr object. If threads_quota is non-zero, it will be @@ -214,12 +199,7 @@ private: // Recycled list of pool objects std::list _free_pool_objs; - // Updates the per pool quota and notifies any pools that now have - // more threads they can use. Must be called with _lock taken. - // If new_pool is non-null, new_pool will *not* be notified. - void update_pool_quotas(ResourcePool* new_pool); - - void update_pool_quotas() { update_pool_quotas(NULL); } + void update_pool_quotas(); }; inline void ThreadResourceMgr::ResourcePool::acquire_thread_token() { @@ -265,20 +245,6 @@ inline void ThreadResourceMgr::ResourcePool::release_thread_token(bool required) } } } - - // We need to grab a lock before issuing the callback to prevent the - // callback from being removed while it is happening. - // Note: this is unlikely to be a big deal for performance currently - // since only scanner threads call this with any frequency and that only - // happens once when the scanner thread is complete. - // TODO: reconsider this. - if (num_available_threads() > 0 && _thread_available_fn != NULL) { - std::unique_lock l(_lock); - - if (num_available_threads() > 0 && _thread_available_fn != NULL) { - _thread_available_fn(this); - } - } } } // namespace doris diff --git a/be/test/runtime/thread_resource_mgr_test.cpp b/be/test/runtime/thread_resource_mgr_test.cpp index 711852fdb5..39b9834b96 100644 --- a/be/test/runtime/thread_resource_mgr_test.cpp +++ b/be/test/runtime/thread_resource_mgr_test.cpp @@ -26,42 +26,20 @@ namespace doris { -class NotifiedCounter { -public: - NotifiedCounter() : _counter(0) {} - - void Notify(ThreadResourceMgr::ResourcePool* consumer) { - DCHECK(consumer != NULL); - DCHECK_LT(consumer->num_threads(), consumer->quota()); - ++_counter; - } - - int counter() const { return _counter; } - -private: - int _counter; -}; - TEST(ThreadResourceMgr, BasicTest) { ThreadResourceMgr mgr(5); - NotifiedCounter counter1; - NotifiedCounter counter2; ThreadResourceMgr::ResourcePool* c1 = mgr.register_pool(); - c1->set_thread_available_cb( - std::bind(std::mem_fn(&NotifiedCounter::Notify), &counter1, _1)); c1->acquire_thread_token(); c1->acquire_thread_token(); c1->acquire_thread_token(); EXPECT_EQ(c1->num_threads(), 3); EXPECT_EQ(c1->num_required_threads(), 3); EXPECT_EQ(c1->num_optional_threads(), 0); - EXPECT_EQ(counter1.counter(), 0); c1->release_thread_token(true); EXPECT_EQ(c1->num_threads(), 2); EXPECT_EQ(c1->num_required_threads(), 2); EXPECT_EQ(c1->num_optional_threads(), 0); - EXPECT_EQ(counter1.counter(), 1); EXPECT_TRUE(c1->try_acquire_thread_token()); EXPECT_TRUE(c1->try_acquire_thread_token()); EXPECT_TRUE(c1->try_acquire_thread_token()); @@ -71,12 +49,9 @@ TEST(ThreadResourceMgr, BasicTest) { EXPECT_EQ(c1->num_optional_threads(), 3); c1->release_thread_token(true); c1->release_thread_token(false); - EXPECT_EQ(counter1.counter(), 3); // Register a new consumer, quota is cut in half ThreadResourceMgr::ResourcePool* c2 = mgr.register_pool(); - c2->set_thread_available_cb( - std::bind(std::mem_fn(&NotifiedCounter::Notify), &counter2, _1)); EXPECT_FALSE(c1->try_acquire_thread_token()); EXPECT_EQ(c1->num_threads(), 3); c1->acquire_thread_token(); @@ -86,8 +61,6 @@ TEST(ThreadResourceMgr, BasicTest) { mgr.unregister_pool(c1); mgr.unregister_pool(c2); - EXPECT_EQ(counter1.counter(), 3); - EXPECT_EQ(counter2.counter(), 1); } } // namespace doris