From 5e15f1b92a550480d4acf11435cc6cfc35f173f0 Mon Sep 17 00:00:00 2001 From: coolfishchen Date: Thu, 13 Jul 2023 14:48:12 +0000 Subject: [PATCH] rename table load thread name Co-authored-by: suz-yang --- .../table_load/ob_table_load_client_task.cpp | 3 ++- .../ob_table_load_coordinator_ctx.cpp | 3 ++- .../table_load/ob_table_load_mem_compactor.cpp | 4 ++-- .../table_load/ob_table_load_store_ctx.cpp | 2 +- .../ob_table_load_task_scheduler.cpp | 18 ++++++++++++------ .../table_load/ob_table_load_task_scheduler.h | 6 ++++-- .../engine/cmd/ob_load_data_direct_impl.cpp | 2 +- 7 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 37bde99b46..12c0e8ace4 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -19,6 +19,7 @@ #include "observer/table_load/ob_table_load_table_ctx.h" #include "observer/table_load/ob_table_load_task_scheduler.h" #include "observer/table_load/ob_table_load_utils.h" +#include "observer/table_load/ob_table_load_task.h" namespace oceanbase { @@ -99,7 +100,7 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t t } else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) { LOG_WARN("fail to init task allocator", KR(ret)); } else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, - (&allocator_), 1, allocator_))) { + (&allocator_), 1, table_id, "Client"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) { diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index b002979b4e..b250adcbb4 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -129,7 +129,8 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, } // init task_scheduler_ else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), - ctx_->param_.session_count_, allocator_))) { + ctx_->param_.session_count_, + ctx_->param_.table_id_, "Coordinator"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index f4185dc08b..1de9773237 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -292,8 +292,8 @@ int ObTableLoadMemCompactor::init_scheduler() { int ret = OB_SUCCESS; // 初始化task_scheduler_ - if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), - 1, allocator_))) { + if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1, + param_->table_id_, "MemCompact"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) { diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 57fdc8a356..adc306517d 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -166,7 +166,7 @@ int ObTableLoadStoreCtx::init( } // 初始化task_scheduler_ else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), - ctx_->param_.session_count_, allocator_))) { + ctx_->param_.session_count_, ctx_->param_.table_id_, "Store"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) { diff --git a/src/observer/table_load/ob_table_load_task_scheduler.cpp b/src/observer/table_load/ob_table_load_task_scheduler.cpp index e37fd20aad..8e3bd3dd9f 100644 --- a/src/observer/table_load/ob_table_load_task_scheduler.cpp +++ b/src/observer/table_load/ob_table_load_task_scheduler.cpp @@ -48,9 +48,10 @@ void ObTableLoadTaskThreadPoolScheduler::MyThreadPool::run1() } ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t thread_count, - ObIAllocator &allocator, + uint64_t table_id, + const char *label, int64_t session_queue_size) - : allocator_(allocator), + : allocator_("TLD_ThreadPool"), thread_count_(thread_count), session_queue_size_(session_queue_size), timeout_ts_(INT64_MAX), @@ -59,6 +60,7 @@ ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t t state_(STATE_ZERO), is_inited_(false) { + snprintf(name_, OB_THREAD_NAME_BUF_LEN, "TLD_%03ld_%s", table_id % 1000, label); } ObTableLoadTaskThreadPoolScheduler::~ObTableLoadTaskThreadPoolScheduler() @@ -101,9 +103,8 @@ int ObTableLoadTaskThreadPoolScheduler::init() if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadTaskThreadPoolScheduler init twice", KR(ret), KP(this)); - } else if (OB_FAIL(init_worker_ctx_array())) { - LOG_WARN("fail to init worker ctx array", KR(ret)); } else { + allocator_.set_tenant_id(MTL_ID()); thread_pool_.set_thread_count(thread_count_); thread_pool_.set_run_wrapper(MTL_CTX()); ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id(); @@ -115,7 +116,11 @@ int ObTableLoadTaskThreadPoolScheduler::init() trace_id_.init(zero_addr); } timeout_ts_ = THIS_WORKER.get_timeout_ts(); - is_inited_ = true; + if (OB_FAIL(init_worker_ctx_array())) { + LOG_WARN("fail to init worker ctx array", KR(ret)); + } else { + is_inited_ = true; + } } return ret; } @@ -203,7 +208,8 @@ void ObTableLoadTaskThreadPoolScheduler::after_running() void ObTableLoadTaskThreadPoolScheduler::run(uint64_t thread_idx) { int ret = OB_SUCCESS; - + // set thread name + lib::set_thread_name(name_); // set trace id ObCurTraceId::set(trace_id_); // set worker timeout diff --git a/src/observer/table_load/ob_table_load_task_scheduler.h b/src/observer/table_load/ob_table_load_task_scheduler.h index 19f92b5238..daf0a793d9 100644 --- a/src/observer/table_load/ob_table_load_task_scheduler.h +++ b/src/observer/table_load/ob_table_load_task_scheduler.h @@ -9,6 +9,7 @@ #include "lib/profile/ob_trace_id.h" #include "lib/queue/ob_lighty_queue.h" #include "share/ob_thread_pool.h" +#include "lib/allocator/page_arena.h" namespace oceanbase { @@ -42,7 +43,7 @@ class ObTableLoadTaskThreadPoolScheduler final : public ObITableLoadTaskSchedule static const int STATE_STOPPED = 4; static const int STATE_STOPPED_NO_WAIT = 5; public: - ObTableLoadTaskThreadPoolScheduler(int64_t thread_count, common::ObIAllocator &allocator, + ObTableLoadTaskThreadPoolScheduler(int64_t thread_count, uint64_t table_id, const char *label, int64_t session_queue_size = 64); virtual ~ObTableLoadTaskThreadPoolScheduler(); int init() override; @@ -85,9 +86,10 @@ private: }; int execute_worker_tasks(WorkerContext &worker_ctx); private: - common::ObIAllocator &allocator_; + common::ObArenaAllocator allocator_; const int64_t thread_count_; const int64_t session_queue_size_; + char name_[OB_THREAD_NAME_BUF_LEN]; common::ObCurTraceId::TraceId trace_id_; int64_t timeout_ts_; MyThreadPool thread_pool_; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 9b86e2eb4d..04822ab19e 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1052,7 +1052,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &e // init task_scheduler_ else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_), - worker_count_, *execute_ctx_->allocator_))) { + worker_count_, execute_param_->table_id_, "Parse"))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) {