fixed resize thread pool return -4002 because unit cpu count is greater than 128.

This commit is contained in:
HaHaJeff
2023-07-24 12:12:42 +00:00
committed by ob-robot
parent 95018ccb1d
commit 6aa97ebc17
2 changed files with 30 additions and 21 deletions

View File

@ -141,8 +141,6 @@ int ObLogExternalStorageHandler::resize(const int64_t new_concurrency,
} else if (!is_running_) { } else if (!is_running_) {
ret = OB_NOT_RUNNING; ret = OB_NOT_RUNNING;
CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us)); CLOG_LOG(WARN, "ObLogExternalStorageHandler not running", KPC(this), K(new_concurrency), K(timeout_us));
} else if (new_concurrency == concurrency_) {
CLOG_LOG(TRACE, "no need resize", KPC(this), K(new_concurrency));
} else { } else {
do { do {
ret = resize_(new_concurrency); ret = resize_(new_concurrency);
@ -151,9 +149,7 @@ int ObLogExternalStorageHandler::resize(const int64_t new_concurrency,
} }
} while (OB_FAIL(ret)); } while (OB_FAIL(ret));
time_guard.click("after create new thread pool"); time_guard.click("after create new thread pool");
concurrency_ = new_concurrency; CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", KPC(this), K(new_concurrency));
capacity_ = CAPACITY_COEFFICIENT * new_concurrency;
CLOG_LOG(INFO, "ObLogExternalStorageHandler resize success", K(new_concurrency));
} }
} }
return ret; return ret;
@ -244,7 +240,7 @@ int64_t ObLogExternalStorageHandler::get_recommend_concurrency_in_single_file()
bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const bool ObLogExternalStorageHandler::is_valid_concurrency_(const int64_t concurrency) const
{ {
return 0 <= concurrency && CONCURRENCY_LIMIT >= concurrency; return 0 <= concurrency;
} }
int64_t ObLogExternalStorageHandler::get_async_task_count_(const int64_t total_size) const int64_t ObLogExternalStorageHandler::get_async_task_count_(const int64_t total_size) const
@ -400,13 +396,15 @@ int ObLogExternalStorageHandler::resize_(const int64_t new_concurrency)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
ObTimeGuard time_guard("resize impl", 10 * 1000); ObTimeGuard time_guard("resize impl", 10 * 1000);
int64_t real_concurrency = MIN(new_concurrency, CONCURRENCY_LIMIT);
if (OB_FAIL(ObSimpleThreadPool::set_thread_count(new_concurrency))) { if (real_concurrency == concurrency_) {
CLOG_LOG(WARN, "set_thread_count failed", K(new_concurrency), KPC(this)); CLOG_LOG(TRACE, "no need resize_", K(new_concurrency), K(real_concurrency), KPC(this));
} else if (OB_FAIL(ObSimpleThreadPool::set_thread_count(real_concurrency))) {
CLOG_LOG(WARN, "set_thread_count failed", K(new_concurrency), KPC(this), K(real_concurrency));
} else { } else {
CLOG_LOG_RET(INFO, OB_SUCCESS, "resize_ success", K(time_guard), KPC(this), K(new_concurrency)); CLOG_LOG_RET(INFO, OB_SUCCESS, "resize_ success", K(time_guard), KPC(this), K(new_concurrency), K(real_concurrency));
concurrency_ = new_concurrency; concurrency_ = real_concurrency;
capacity_ = CAPACITY_COEFFICIENT * new_concurrency; capacity_ = CAPACITY_COEFFICIENT * real_concurrency;
} }
time_guard.click("set thread count"); time_guard.click("set thread count");
return ret; return ret;

View File

@ -145,15 +145,16 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
// 测试invalid argument // 测试invalid argument
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.start(-1)); EXPECT_EQ(OB_INVALID_ARGUMENT, handler.start(-1));
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.start(ObLogExternalStorageHandler::CONCURRENCY_LIMIT+1)); // 当concurrency超过最大并发度时,以最大并发度为准
EXPECT_NE(OB_INVALID_ARGUMENT, handler.start(ObLogExternalStorageHandler::CONCURRENCY_LIMIT+1));
EXPECT_EQ(ObLogExternalStorageHandler::CONCURRENCY_LIMIT, handler.concurrency_);
EXPECT_EQ(ObLogExternalStorageHandler::CONCURRENCY_LIMIT*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT,
handler.capacity_);
EXPECT_EQ(true, handler.is_running_);
// start 成功 // 重复start
const int64_t concurrency = 16; const int64_t concurrency = 16;
EXPECT_EQ(OB_SUCCESS, handler.start(concurrency)); EXPECT_EQ(OB_SUCCESS, handler.start(concurrency));
EXPECT_EQ(true, handler.is_running_);
EXPECT_EQ(concurrency, handler.concurrency_);
EXPECT_EQ(concurrency*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT,
handler.capacity_);
// 验证读取——invalid argument // 验证读取——invalid argument
{ {
@ -178,15 +179,19 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(invalid_concurrency, 0)); EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(invalid_concurrency, 0));
int64_t invalid_timeout_us = 0; int64_t invalid_timeout_us = 0;
EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(concurrency, invalid_timeout_us)); EXPECT_EQ(OB_INVALID_ARGUMENT, handler.resize(concurrency, invalid_timeout_us));
} }
// 验证私有函数 // 验证私有函数
{ {
// 验证is_valid_concurrency_ EXPECT_EQ(OB_SUCCESS, handler.resize_(16));
// 验证is_valid_concurrency_为false
int64_t invalid_concurrency = -1; int64_t invalid_concurrency = -1;
EXPECT_EQ(false, handler.is_valid_concurrency_(invalid_concurrency)); EXPECT_EQ(false, handler.is_valid_concurrency_(invalid_concurrency));
// 当并发度超过128时,会将concurrency_设置为128.
invalid_concurrency = ObLogExternalStorageHandler::CONCURRENCY_LIMIT + 1; invalid_concurrency = ObLogExternalStorageHandler::CONCURRENCY_LIMIT + 1;
EXPECT_EQ(false, handler.is_valid_concurrency_(invalid_concurrency)); EXPECT_EQ(true, handler.is_valid_concurrency_(invalid_concurrency));
// 验证get_async_task_count_ // 验证get_async_task_count_
// 单个任务最小2M, 在concurrency足够的情况下,最多存在8个异步任务 // 单个任务最小2M, 在concurrency足够的情况下,最多存在8个异步任务
@ -218,6 +223,12 @@ TEST(TestLogExternalStorageHandler, test_log_external_storage_handler)
EXPECT_EQ(new_concurrency, handler.concurrency_); EXPECT_EQ(new_concurrency, handler.concurrency_);
EXPECT_EQ(new_concurrency*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT, EXPECT_EQ(new_concurrency*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT,
handler.capacity_); handler.capacity_);
new_concurrency = 129;
EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency));
EXPECT_EQ(ObLogExternalStorageHandler::CONCURRENCY_LIMIT, handler.concurrency_);
EXPECT_EQ(ObLogExternalStorageHandler::CONCURRENCY_LIMIT*ObLogExternalStorageHandler::CAPACITY_COEFFICIENT,
handler.capacity_);
new_concurrency = 0; new_concurrency = 0;
EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency)); EXPECT_EQ(OB_SUCCESS, handler.resize(new_concurrency));