rename table load thread name
Co-authored-by: suz-yang <suz.yang@foxmail.com>
This commit is contained in:
@ -19,6 +19,7 @@
|
|||||||
#include "observer/table_load/ob_table_load_table_ctx.h"
|
#include "observer/table_load/ob_table_load_table_ctx.h"
|
||||||
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
#include "observer/table_load/ob_table_load_task_scheduler.h"
|
||||||
#include "observer/table_load/ob_table_load_utils.h"
|
#include "observer/table_load/ob_table_load_utils.h"
|
||||||
|
#include "observer/table_load/ob_table_load_task.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -99,7 +100,7 @@ int ObTableLoadClientTask::init(uint64_t tenant_id, uint64_t user_id, uint64_t t
|
|||||||
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) {
|
} else if (OB_FAIL(task_allocator_.init("TLD_TaskPool", MTL_ID()))) {
|
||||||
LOG_WARN("fail to init task allocator", KR(ret));
|
LOG_WARN("fail to init task allocator", KR(ret));
|
||||||
} else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler,
|
} else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler,
|
||||||
(&allocator_), 1, allocator_))) {
|
(&allocator_), 1, table_id, "Client"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||||
|
|||||||
@ -129,7 +129,8 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray<int64_t> &idx_array,
|
|||||||
}
|
}
|
||||||
// init task_scheduler_
|
// init task_scheduler_
|
||||||
else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_),
|
else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_),
|
||||||
ctx_->param_.session_count_, allocator_))) {
|
ctx_->param_.session_count_,
|
||||||
|
ctx_->param_.table_id_, "Coordinator"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||||
}
|
}
|
||||||
|
|||||||
@ -292,8 +292,8 @@ int ObTableLoadMemCompactor::init_scheduler()
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
// 初始化task_scheduler_
|
// 初始化task_scheduler_
|
||||||
if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_),
|
if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_), 1,
|
||||||
1, allocator_))) {
|
param_->table_id_, "MemCompact"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||||
|
|||||||
@ -166,7 +166,7 @@ int ObTableLoadStoreCtx::init(
|
|||||||
}
|
}
|
||||||
// 初始化task_scheduler_
|
// 初始化task_scheduler_
|
||||||
else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_),
|
else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (&allocator_),
|
||||||
ctx_->param_.session_count_, allocator_))) {
|
ctx_->param_.session_count_, ctx_->param_.table_id_, "Store"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||||
|
|||||||
@ -48,9 +48,10 @@ void ObTableLoadTaskThreadPoolScheduler::MyThreadPool::run1()
|
|||||||
}
|
}
|
||||||
|
|
||||||
ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t thread_count,
|
ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t thread_count,
|
||||||
ObIAllocator &allocator,
|
uint64_t table_id,
|
||||||
|
const char *label,
|
||||||
int64_t session_queue_size)
|
int64_t session_queue_size)
|
||||||
: allocator_(allocator),
|
: allocator_("TLD_ThreadPool"),
|
||||||
thread_count_(thread_count),
|
thread_count_(thread_count),
|
||||||
session_queue_size_(session_queue_size),
|
session_queue_size_(session_queue_size),
|
||||||
timeout_ts_(INT64_MAX),
|
timeout_ts_(INT64_MAX),
|
||||||
@ -59,6 +60,7 @@ ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t t
|
|||||||
state_(STATE_ZERO),
|
state_(STATE_ZERO),
|
||||||
is_inited_(false)
|
is_inited_(false)
|
||||||
{
|
{
|
||||||
|
snprintf(name_, OB_THREAD_NAME_BUF_LEN, "TLD_%03ld_%s", table_id % 1000, label);
|
||||||
}
|
}
|
||||||
|
|
||||||
ObTableLoadTaskThreadPoolScheduler::~ObTableLoadTaskThreadPoolScheduler()
|
ObTableLoadTaskThreadPoolScheduler::~ObTableLoadTaskThreadPoolScheduler()
|
||||||
@ -101,9 +103,8 @@ int ObTableLoadTaskThreadPoolScheduler::init()
|
|||||||
if (IS_INIT) {
|
if (IS_INIT) {
|
||||||
ret = OB_INIT_TWICE;
|
ret = OB_INIT_TWICE;
|
||||||
LOG_WARN("ObTableLoadTaskThreadPoolScheduler init twice", KR(ret), KP(this));
|
LOG_WARN("ObTableLoadTaskThreadPoolScheduler init twice", KR(ret), KP(this));
|
||||||
} else if (OB_FAIL(init_worker_ctx_array())) {
|
|
||||||
LOG_WARN("fail to init worker ctx array", KR(ret));
|
|
||||||
} else {
|
} else {
|
||||||
|
allocator_.set_tenant_id(MTL_ID());
|
||||||
thread_pool_.set_thread_count(thread_count_);
|
thread_pool_.set_thread_count(thread_count_);
|
||||||
thread_pool_.set_run_wrapper(MTL_CTX());
|
thread_pool_.set_run_wrapper(MTL_CTX());
|
||||||
ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
|
ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id();
|
||||||
@ -115,7 +116,11 @@ int ObTableLoadTaskThreadPoolScheduler::init()
|
|||||||
trace_id_.init(zero_addr);
|
trace_id_.init(zero_addr);
|
||||||
}
|
}
|
||||||
timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||||
is_inited_ = true;
|
if (OB_FAIL(init_worker_ctx_array())) {
|
||||||
|
LOG_WARN("fail to init worker ctx array", KR(ret));
|
||||||
|
} else {
|
||||||
|
is_inited_ = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -203,7 +208,8 @@ void ObTableLoadTaskThreadPoolScheduler::after_running()
|
|||||||
void ObTableLoadTaskThreadPoolScheduler::run(uint64_t thread_idx)
|
void ObTableLoadTaskThreadPoolScheduler::run(uint64_t thread_idx)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
// set thread name
|
||||||
|
lib::set_thread_name(name_);
|
||||||
// set trace id
|
// set trace id
|
||||||
ObCurTraceId::set(trace_id_);
|
ObCurTraceId::set(trace_id_);
|
||||||
// set worker timeout
|
// set worker timeout
|
||||||
|
|||||||
@ -9,6 +9,7 @@
|
|||||||
#include "lib/profile/ob_trace_id.h"
|
#include "lib/profile/ob_trace_id.h"
|
||||||
#include "lib/queue/ob_lighty_queue.h"
|
#include "lib/queue/ob_lighty_queue.h"
|
||||||
#include "share/ob_thread_pool.h"
|
#include "share/ob_thread_pool.h"
|
||||||
|
#include "lib/allocator/page_arena.h"
|
||||||
|
|
||||||
namespace oceanbase
|
namespace oceanbase
|
||||||
{
|
{
|
||||||
@ -42,7 +43,7 @@ class ObTableLoadTaskThreadPoolScheduler final : public ObITableLoadTaskSchedule
|
|||||||
static const int STATE_STOPPED = 4;
|
static const int STATE_STOPPED = 4;
|
||||||
static const int STATE_STOPPED_NO_WAIT = 5;
|
static const int STATE_STOPPED_NO_WAIT = 5;
|
||||||
public:
|
public:
|
||||||
ObTableLoadTaskThreadPoolScheduler(int64_t thread_count, common::ObIAllocator &allocator,
|
ObTableLoadTaskThreadPoolScheduler(int64_t thread_count, uint64_t table_id, const char *label,
|
||||||
int64_t session_queue_size = 64);
|
int64_t session_queue_size = 64);
|
||||||
virtual ~ObTableLoadTaskThreadPoolScheduler();
|
virtual ~ObTableLoadTaskThreadPoolScheduler();
|
||||||
int init() override;
|
int init() override;
|
||||||
@ -85,9 +86,10 @@ private:
|
|||||||
};
|
};
|
||||||
int execute_worker_tasks(WorkerContext &worker_ctx);
|
int execute_worker_tasks(WorkerContext &worker_ctx);
|
||||||
private:
|
private:
|
||||||
common::ObIAllocator &allocator_;
|
common::ObArenaAllocator allocator_;
|
||||||
const int64_t thread_count_;
|
const int64_t thread_count_;
|
||||||
const int64_t session_queue_size_;
|
const int64_t session_queue_size_;
|
||||||
|
char name_[OB_THREAD_NAME_BUF_LEN];
|
||||||
common::ObCurTraceId::TraceId trace_id_;
|
common::ObCurTraceId::TraceId trace_id_;
|
||||||
int64_t timeout_ts_;
|
int64_t timeout_ts_;
|
||||||
MyThreadPool thread_pool_;
|
MyThreadPool thread_pool_;
|
||||||
|
|||||||
@ -1052,7 +1052,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &e
|
|||||||
// init task_scheduler_
|
// init task_scheduler_
|
||||||
else if (OB_ISNULL(task_scheduler_ =
|
else if (OB_ISNULL(task_scheduler_ =
|
||||||
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_),
|
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_),
|
||||||
worker_count_, *execute_ctx_->allocator_))) {
|
worker_count_, execute_param_->table_id_, "Parse"))) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
|
||||||
} else if (OB_FAIL(task_scheduler_->init())) {
|
} else if (OB_FAIL(task_scheduler_->init())) {
|
||||||
|
|||||||
Reference in New Issue
Block a user