Fix the problem of non-blocking shrinking not being enabled
This commit is contained in:
@ -364,6 +364,12 @@ int ObSimpleDynamicThreadPool::init(const int64_t thread_num, const char* name,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ObSimpleDynamicThreadPool::stop()
|
||||||
|
{
|
||||||
|
IGNORE_RETURN ObSimpleThreadPoolDynamicMgr::get_instance().unbind(this);
|
||||||
|
lib::ThreadPool::stop();
|
||||||
|
}
|
||||||
|
|
||||||
void ObSimpleDynamicThreadPool::destroy()
|
void ObSimpleDynamicThreadPool::destroy()
|
||||||
{
|
{
|
||||||
if (min_thread_cnt_ < max_thread_cnt_) {
|
if (min_thread_cnt_ < max_thread_cnt_) {
|
||||||
@ -437,7 +443,7 @@ int ObSimpleDynamicThreadPool::set_max_thread_count(int64_t max_thread_cnt)
|
|||||||
int ObSimpleDynamicThreadPool::set_thread_count_and_try_recycle(int64_t cnt)
|
int ObSimpleDynamicThreadPool::set_thread_count_and_try_recycle(int64_t cnt)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ret = Threads::set_thread_count(cnt);
|
ret = Threads::do_set_thread_count(cnt, true/*async_recycle*/);
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
ret = Threads::try_thread_recycle();
|
ret = Threads::try_thread_recycle();
|
||||||
}
|
}
|
||||||
@ -623,6 +629,7 @@ int ObSimpleThreadPoolDynamicMgr::bind(ObSimpleDynamicThreadPool *pool)
|
|||||||
if (OB_FAIL(simple_thread_pool_list_.push_back(pool_stat))) {
|
if (OB_FAIL(simple_thread_pool_list_.push_back(pool_stat))) {
|
||||||
COMMON_LOG(WARN, "bind simple thread pool faild", KP(pool));
|
COMMON_LOG(WARN, "bind simple thread pool faild", KP(pool));
|
||||||
} else {
|
} else {
|
||||||
|
pool->has_bind_ = true;
|
||||||
COMMON_LOG(INFO, "bind simple thread pool success", K(*pool));
|
COMMON_LOG(INFO, "bind simple thread pool success", K(*pool));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -634,6 +641,8 @@ int ObSimpleThreadPoolDynamicMgr::unbind(ObSimpleDynamicThreadPool *pool)
|
|||||||
if (OB_UNLIKELY(NULL == pool)) {
|
if (OB_UNLIKELY(NULL == pool)) {
|
||||||
ret = OB_INVALID_ARGUMENT;
|
ret = OB_INVALID_ARGUMENT;
|
||||||
COMMON_LOG(WARN, "unbind pool failed");
|
COMMON_LOG(WARN, "unbind pool failed");
|
||||||
|
} else if (!pool->has_bind_) {
|
||||||
|
// do-nothing
|
||||||
} else {
|
} else {
|
||||||
SpinWLockGuard guard(simple_thread_pool_list_lock_);
|
SpinWLockGuard guard(simple_thread_pool_list_lock_);
|
||||||
int64_t idx = -1;
|
int64_t idx = -1;
|
||||||
@ -646,6 +655,7 @@ int ObSimpleThreadPoolDynamicMgr::unbind(ObSimpleDynamicThreadPool *pool)
|
|||||||
if ((-1 != idx) && OB_FAIL(simple_thread_pool_list_.remove(idx))) {
|
if ((-1 != idx) && OB_FAIL(simple_thread_pool_list_.remove(idx))) {
|
||||||
COMMON_LOG(WARN, "failed to remove simple_thread_pool", K(ret), K(idx), KP(pool));
|
COMMON_LOG(WARN, "failed to remove simple_thread_pool", K(ret), K(idx), KP(pool));
|
||||||
} else {
|
} else {
|
||||||
|
pool->has_bind_ = false;
|
||||||
COMMON_LOG(INFO, "try to unbind simple thread pool", K(*pool), K(idx));
|
COMMON_LOG(INFO, "try to unbind simple thread pool", K(*pool), K(idx));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -92,14 +92,16 @@ private:
|
|||||||
class ObSimpleDynamicThreadPool
|
class ObSimpleDynamicThreadPool
|
||||||
: public lib::ThreadPool
|
: public lib::ThreadPool
|
||||||
{
|
{
|
||||||
|
friend class ObSimpleThreadPoolDynamicMgr;
|
||||||
public:
|
public:
|
||||||
static const int64_t MAX_THREAD_NUM = 1024;
|
static const int64_t MAX_THREAD_NUM = 1024;
|
||||||
ObSimpleDynamicThreadPool()
|
ObSimpleDynamicThreadPool()
|
||||||
: min_thread_cnt_(OB_INVALID_COUNT), max_thread_cnt_(OB_INVALID_COUNT),
|
: has_bind_(false), min_thread_cnt_(OB_INVALID_COUNT), max_thread_cnt_(OB_INVALID_COUNT),
|
||||||
running_thread_cnt_(0), threads_idle_time_(0), update_threads_lock_(), ref_cnt_(0), name_("unknown"), tenant_id_(OB_SERVER_TENANT_ID)
|
running_thread_cnt_(0), threads_idle_time_(0), update_threads_lock_(), ref_cnt_(0), name_("unknown"), tenant_id_(OB_SERVER_TENANT_ID)
|
||||||
{}
|
{}
|
||||||
virtual ~ObSimpleDynamicThreadPool();
|
virtual ~ObSimpleDynamicThreadPool();
|
||||||
int init(const int64_t thread_num, const char* name, const int64_t tenant_id);
|
int init(const int64_t thread_num, const char* name, const int64_t tenant_id);
|
||||||
|
virtual void stop() override;
|
||||||
void destroy();
|
void destroy();
|
||||||
int set_adaptive_thread(int64_t min_thread_num, int64_t max_thread_num);
|
int set_adaptive_thread(int64_t min_thread_num, int64_t max_thread_num);
|
||||||
virtual int64_t get_queue_num() const = 0;
|
virtual int64_t get_queue_num() const = 0;
|
||||||
@ -126,6 +128,7 @@ protected:
|
|||||||
}
|
}
|
||||||
int set_thread_count_and_try_recycle(int64_t cnt);
|
int set_thread_count_and_try_recycle(int64_t cnt);
|
||||||
private:
|
private:
|
||||||
|
bool has_bind_;
|
||||||
int64_t min_thread_cnt_;
|
int64_t min_thread_cnt_;
|
||||||
int64_t max_thread_cnt_;
|
int64_t max_thread_cnt_;
|
||||||
int64_t running_thread_cnt_;
|
int64_t running_thread_cnt_;
|
||||||
|
|||||||
@ -11,7 +11,9 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
|
#define private public
|
||||||
#include "lib/thread/ob_simple_thread_pool.h"
|
#include "lib/thread/ob_simple_thread_pool.h"
|
||||||
|
#undef private
|
||||||
#include "lib/coro/testing.h"
|
#include "lib/coro/testing.h"
|
||||||
|
|
||||||
using namespace oceanbase::common;
|
using namespace oceanbase::common;
|
||||||
@ -75,6 +77,21 @@ TEST(DISABLED_TestSimpleThreadPool, Basic)
|
|||||||
pool.destroy();
|
pool.destroy();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
TEST(TestSimpleThreadPool, test_dynamic_simple_thread_pool_bind)
|
||||||
|
{
|
||||||
|
class ObTestSimpleThreadPool : public ObSimpleThreadPool {
|
||||||
|
void handle(void *) {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
int ret = ObSimpleThreadPoolDynamicMgr::get_instance().init();
|
||||||
|
ASSERT_EQ(ret, OB_SUCCESS);
|
||||||
|
ObTestSimpleThreadPool pool;
|
||||||
|
ret = pool.set_adaptive_thread(1, 3);
|
||||||
|
ASSERT_EQ(ret, OB_SUCCESS);
|
||||||
|
ASSERT_TRUE(pool.has_bind_);
|
||||||
|
pool.stop();
|
||||||
|
ASSERT_FALSE(pool.has_bind_);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(TestSimpleThreadPool, DISABLED_test_dynamic_simple_thread_pool)
|
TEST(TestSimpleThreadPool, DISABLED_test_dynamic_simple_thread_pool)
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user