diff --git a/src/observer/ob_partition_location_updater.cpp b/src/observer/ob_partition_location_updater.cpp index 8c6b154187..f828887ef0 100644 --- a/src/observer/ob_partition_location_updater.cpp +++ b/src/observer/ob_partition_location_updater.cpp @@ -97,6 +97,26 @@ void ObPartitionLocationUpdater::destroy() wait(); } +int ObPartitionLocationUpdater::set_thread_cnt(const int64_t thread_cnt) +{ + int ret = OB_SUCCESS; + if (!inited_) { + ret = OB_NOT_INIT; + LOG_WARN("updater is not inited yet", KR(ret), K(thread_cnt)); + } else if (!lib::is_mini_mode() && thread_cnt > 0 && thread_cnt != thread_cnt_) { + if (OB_FAIL(sender_.set_thread_count(thread_cnt))) { + LOG_ERROR("fail to set sender's thread cnt", KR(ret), K(thread_cnt), K(thread_cnt_)); + } else if (OB_FAIL(receiver_.set_thread_count(thread_cnt))) { + LOG_ERROR("fail to set receiver's thread cnt", KR(ret), K(thread_cnt), K(thread_cnt_)); + } else { + LOG_INFO( + "[LOCATION] location updater change thread cnt", "pre_thread_cnt", thread_cnt_, "cur_thread_cnt", thread_cnt); + thread_cnt_ = thread_cnt; + } + } + return ret; +} + int ObPartitionLocationUpdater::submit_broadcast_task(const ObPartitionBroadcastTask& task) { int ret = OB_SUCCESS; diff --git a/src/observer/ob_partition_location_updater.h b/src/observer/ob_partition_location_updater.h index 72a7cf017a..30112ede92 100644 --- a/src/observer/ob_partition_location_updater.h +++ b/src/observer/ob_partition_location_updater.h @@ -66,6 +66,8 @@ public: obrpc::ObSrvRpcProxy*& srv_rpc_proxy, share::ObPartitionLocationCache*& location_cache, share::ObIAliveServerTracer& server_tracer); + int set_thread_cnt(const int64_t thread_cnt); + void stop(); void wait(); void destroy(); diff --git a/src/observer/ob_server_reload_config.cpp b/src/observer/ob_server_reload_config.cpp index 8ab051630f..3f470e8da2 100644 --- a/src/observer/ob_server_reload_config.cpp +++ b/src/observer/ob_server_reload_config.cpp @@ -29,6 +29,7 @@ #include "observer/ob_server.h" #include "storage/ob_partition_scheduler.h" #include "storage/ob_partition_migrator.h" +#include "observer/ob_service.h" using namespace oceanbase::lib; using namespace oceanbase::common; @@ -338,5 +339,13 @@ int ObServerReloadConfig::operator()() { OB_STORE_FILE.resize_file(GCONF.datafile_size, GCONF.datafile_disk_percentage); } + { + const int64_t location_thread_cnt = GCONF.location_refresh_thread_count; + if (OB_NOT_NULL(GCTX.ob_service_) && + OB_FAIL(GCTX.ob_service_->get_partition_location_updater().set_thread_cnt(location_thread_cnt))) { + real_ret = ret; + LOG_WARN("fail to set location updater's thread cnt", KR(ret), K(location_thread_cnt)); + } + } return real_ret; } diff --git a/src/observer/ob_service.h b/src/observer/ob_service.h index 918ab07a77..bead08ac30 100644 --- a/src/observer/ob_service.h +++ b/src/observer/ob_service.h @@ -286,6 +286,10 @@ public: int64_t get_partition_table_updater_user_queue_size() const; int64_t get_partition_table_updater_sys_queue_size() const; int64_t get_partition_table_updater_core_queue_size() const; + ObPartitionLocationUpdater &get_partition_location_updater() + { + return partition_location_updater_; + } int get_all_partition_status(int64_t& inactive_num, int64_t& total_num) const; int get_root_server_status(obrpc::ObGetRootserverRoleResult& get_role_result); diff --git a/src/observer/ob_uniq_task_queue.h b/src/observer/ob_uniq_task_queue.h index 557f5a2800..411a159962 100644 --- a/src/observer/ob_uniq_task_queue.h +++ b/src/observer/ob_uniq_task_queue.h @@ -305,7 +305,7 @@ void ObUniqTaskQueue::run1() ret = common::OB_NOT_INIT; SERVER_LOG(WARN, "not init", K(ret)); } else { - while (!has_set_stop()) { + while (!lib::this_thread::has_set_stop()) { Task* t = NULL; tasks.reuse(); if (OB_SUCC(tasks.reserve(batch_exec_cnt))) { @@ -431,10 +431,11 @@ template int ObUniqTaskQueue::process_barrier(Task& task) { int ret = common::OB_SUCCESS; + bool stopped = lib::this_thread::has_set_stop(); if (OB_ISNULL(updater_)) { ret = common::OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid updater", K(ret), K(updater_)); - } else if (OB_FAIL(updater_->process_barrier(task, has_set_stop()))) { + } else if (OB_FAIL(updater_->process_barrier(task, stopped))) { SERVER_LOG(WARN, "fail to batch process task", K(ret)); } return ret; @@ -444,12 +445,13 @@ template int ObUniqTaskQueue::batch_process_tasks(common::ObIArray& tasks) { int ret = common::OB_SUCCESS; + bool stopped = lib::this_thread::has_set_stop(); if (0 == tasks.count()) { // nothing todo } else if (OB_ISNULL(updater_)) { ret = common::OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "invalid updater", K(ret), K(updater_)); - } else if (OB_FAIL(updater_->batch_process_tasks(tasks, has_set_stop()))) { + } else if (OB_FAIL(updater_->batch_process_tasks(tasks, stopped))) { SERVER_LOG(WARN, "fail to batch process task", K(ret)); } return ret; diff --git a/src/share/partition_table/ob_partition_location_cache.cpp b/src/share/partition_table/ob_partition_location_cache.cpp index 1f4470f92b..0a1159ab0a 100644 --- a/src/share/partition_table/ob_partition_location_cache.cpp +++ b/src/share/partition_table/ob_partition_location_cache.cpp @@ -2002,6 +2002,24 @@ int ObLocationAsyncUpdateQueueSet::init(ObServerConfig& config) return ret; } +int ObLocationAsyncUpdateQueueSet::set_thread_count(const int64_t thread_cnt) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("not init", KR(ret)); + } else if (!lib::is_mini_mode() && thread_cnt > 0) { + if (OB_FAIL(tenant_space_update_queue_.set_thread_count(thread_cnt))) { + LOG_ERROR("fail to set thread count", KR(ret), K(thread_cnt)); + } else if (OB_FAIL(user_update_queue_.set_thread_count(thread_cnt))) { + LOG_ERROR("fail to set thread count", KR(ret), K(thread_cnt)); + } else { + LOG_INFO("[LOCATION] location queue may change thread cnt", K(thread_cnt)); + } + } + return ret; +} + int ObLocationAsyncUpdateQueueSet::add_task(const ObLocationAsyncUpdateTask& task) { int ret = OB_SUCCESS; @@ -2181,6 +2199,13 @@ int ObPartitionLocationCache::reload_config() LOG_WARN("not init", K(ret)); } else { sem_.set_max_count(config_->location_fetch_concurrency); + int64_t thread_cnt = config_->location_refresh_thread_count; + int tmp_ret = OB_SUCCESS; + if (OB_SUCCESS != (tmp_ret = local_async_queue_set_.set_thread_count(thread_cnt))) { + LOG_WARN("fail to set thread count", KR(tmp_ret), K(thread_cnt)); + } else if (OB_SUCCESS != (tmp_ret = remote_async_queue_set_.set_thread_count(thread_cnt))) { + LOG_WARN("fail to set thread count", KR(tmp_ret), K(thread_cnt)); + } } return ret; } diff --git a/src/share/partition_table/ob_partition_location_cache.h b/src/share/partition_table/ob_partition_location_cache.h index 1596e5f88f..0a7cce686d 100644 --- a/src/share/partition_table/ob_partition_location_cache.h +++ b/src/share/partition_table/ob_partition_location_cache.h @@ -629,6 +629,7 @@ public: ObLocationAsyncUpdateQueueSet(ObIPartitionLocationCache* loc_cache_); virtual ~ObLocationAsyncUpdateQueueSet(); int init(common::ObServerConfig& config); + int set_thread_count(const int64_t thread_cnt); int add_task(const ObLocationAsyncUpdateTask& task); void stop(); void wait();