diff --git a/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.cpp b/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.cpp index d5636f99b..1c6693d21 100644 --- a/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.cpp +++ b/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.cpp @@ -364,6 +364,12 @@ int ObSimpleDynamicThreadPool::init(const int64_t thread_num, const char* name, return ret; } +void ObSimpleDynamicThreadPool::stop() +{ + IGNORE_RETURN ObSimpleThreadPoolDynamicMgr::get_instance().unbind(this); + lib::ThreadPool::stop(); +} + void ObSimpleDynamicThreadPool::destroy() { if (min_thread_cnt_ < max_thread_cnt_) { @@ -437,7 +443,7 @@ int ObSimpleDynamicThreadPool::set_max_thread_count(int64_t max_thread_cnt) int ObSimpleDynamicThreadPool::set_thread_count_and_try_recycle(int64_t cnt) { int ret = OB_SUCCESS; - ret = Threads::set_thread_count(cnt); + ret = Threads::do_set_thread_count(cnt, true/*async_recycle*/); if (OB_SUCC(ret)) { ret = Threads::try_thread_recycle(); } @@ -623,6 +629,7 @@ int ObSimpleThreadPoolDynamicMgr::bind(ObSimpleDynamicThreadPool *pool) if (OB_FAIL(simple_thread_pool_list_.push_back(pool_stat))) { COMMON_LOG(WARN, "bind simple thread pool faild", KP(pool)); } else { + pool->has_bind_ = true; COMMON_LOG(INFO, "bind simple thread pool success", K(*pool)); } return ret; @@ -634,6 +641,8 @@ int ObSimpleThreadPoolDynamicMgr::unbind(ObSimpleDynamicThreadPool *pool) if (OB_UNLIKELY(NULL == pool)) { ret = OB_INVALID_ARGUMENT; COMMON_LOG(WARN, "unbind pool failed"); + } else if (!pool->has_bind_) { + // do-nothing } else { SpinWLockGuard guard(simple_thread_pool_list_lock_); int64_t idx = -1; @@ -646,6 +655,7 @@ int ObSimpleThreadPoolDynamicMgr::unbind(ObSimpleDynamicThreadPool *pool) if ((-1 != idx) && OB_FAIL(simple_thread_pool_list_.remove(idx))) { COMMON_LOG(WARN, "failed to remove simple_thread_pool", K(ret), K(idx), KP(pool)); } else { + pool->has_bind_ = false; COMMON_LOG(INFO, "try to unbind simple thread pool", K(*pool), K(idx)); } } diff --git a/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.h b/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.h index 252a58c04..9bd0531d3 100644 --- a/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.h +++ b/deps/oblib/src/lib/thread/ob_dynamic_thread_pool.h @@ -92,14 +92,16 @@ private: class ObSimpleDynamicThreadPool : public lib::ThreadPool { + friend class ObSimpleThreadPoolDynamicMgr; public: static const int64_t MAX_THREAD_NUM = 1024; ObSimpleDynamicThreadPool() - : min_thread_cnt_(OB_INVALID_COUNT), max_thread_cnt_(OB_INVALID_COUNT), + : has_bind_(false), min_thread_cnt_(OB_INVALID_COUNT), max_thread_cnt_(OB_INVALID_COUNT), running_thread_cnt_(0), threads_idle_time_(0), update_threads_lock_(), ref_cnt_(0), name_("unknown"), tenant_id_(OB_SERVER_TENANT_ID) {} virtual ~ObSimpleDynamicThreadPool(); int init(const int64_t thread_num, const char* name, const int64_t tenant_id); + virtual void stop() override; void destroy(); int set_adaptive_thread(int64_t min_thread_num, int64_t max_thread_num); virtual int64_t get_queue_num() const = 0; @@ -126,6 +128,7 @@ protected: } int set_thread_count_and_try_recycle(int64_t cnt); private: + bool has_bind_; int64_t min_thread_cnt_; int64_t max_thread_cnt_; int64_t running_thread_cnt_; diff --git a/deps/oblib/unittest/lib/thread/test_simple_thread_pool.cpp b/deps/oblib/unittest/lib/thread/test_simple_thread_pool.cpp index 681af976b..089e66e4d 100644 --- a/deps/oblib/unittest/lib/thread/test_simple_thread_pool.cpp +++ b/deps/oblib/unittest/lib/thread/test_simple_thread_pool.cpp @@ -11,7 +11,9 @@ */ #include +#define private public #include "lib/thread/ob_simple_thread_pool.h" +#undef private #include "lib/coro/testing.h" using namespace oceanbase::common; @@ -75,6 +77,21 @@ TEST(DISABLED_TestSimpleThreadPool, Basic) pool.destroy(); } +TEST(TestSimpleThreadPool, test_dynamic_simple_thread_pool_bind) +{ + class ObTestSimpleThreadPool : public ObSimpleThreadPool { + void handle(void *) { + } + }; + int ret = ObSimpleThreadPoolDynamicMgr::get_instance().init(); + ASSERT_EQ(ret, OB_SUCCESS); + ObTestSimpleThreadPool pool; + ret = pool.set_adaptive_thread(1, 3); + ASSERT_EQ(ret, OB_SUCCESS); + ASSERT_TRUE(pool.has_bind_); + pool.stop(); + ASSERT_FALSE(pool.has_bind_); +} TEST(TestSimpleThreadPool, DISABLED_test_dynamic_simple_thread_pool) {