Remove the restriction of max_table_load_session_count in direct-load data

This commit is contained in:
leftgeek
2023-05-15 03:46:35 +00:00
committed by ob-robot
parent 907db4b32b
commit 0ff05b2832
11 changed files with 98 additions and 51 deletions

View File

@ -1018,7 +1018,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor()
task_scheduler_ = nullptr;
}
if (nullptr != worker_ctx_array_) {
for (int64_t i = 0; i < execute_param_->parallel_; ++i) {
for (int64_t i = 0; i < execute_param_->thread_count_; ++i) {
WorkerContext *worker_ctx = worker_ctx_array_ + i;
worker_ctx->~WorkerContext();
}
@ -1047,7 +1047,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &e
// init task_scheduler_
else if (OB_ISNULL(task_scheduler_ =
OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_),
execute_param_->parallel_, *execute_ctx_->allocator_))) {
execute_param_->thread_count_, *execute_ctx_->allocator_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret));
} else if (OB_FAIL(task_scheduler_->init())) {
@ -1092,12 +1092,12 @@ int ObLoadDataDirectImpl::FileLoadExecutor::init_worker_ctx_array()
int ret = OB_SUCCESS;
void *buf = nullptr;
if (OB_ISNULL(
buf = execute_ctx_->allocator_->alloc(sizeof(WorkerContext) * execute_param_->parallel_))) {
buf = execute_ctx_->allocator_->alloc(sizeof(WorkerContext) * execute_param_->thread_count_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate memory", KR(ret));
} else {
worker_ctx_array_ = new (buf) WorkerContext[execute_param_->parallel_];
for (int64_t i = 0; OB_SUCC(ret) && i < execute_param_->parallel_; ++i) {
worker_ctx_array_ = new (buf) WorkerContext[execute_param_->thread_count_];
for (int64_t i = 0; OB_SUCC(ret) && i < execute_param_->thread_count_; ++i) {
WorkerContext *worker_ctx = worker_ctx_array_ + i;
if (OB_FAIL(worker_ctx->data_parser_.init(execute_param_->data_access_param_,
*execute_ctx_->logger_))) {
@ -1292,7 +1292,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_i
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this));
} else if (OB_UNLIKELY(worker_idx < 0 || worker_idx >= execute_param_->parallel_ ||
} else if (OB_UNLIKELY(worker_idx < 0 || worker_idx >= execute_param_->thread_count_ ||
nullptr == handle)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), KPC(execute_param_), K(worker_idx), KP(handle));
@ -1527,7 +1527,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::fill_task(TaskHandle *handle,
int32_t ObLoadDataDirectImpl::LargeFileLoadExecutor::get_session_id()
{
if (next_session_id_ > execute_param_->parallel_) {
if (next_session_id_ > execute_param_->thread_count_) {
next_session_id_ = 1;
}
return next_session_id_++;
@ -1827,7 +1827,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
DataDesc data_desc;
data_desc.filename_ = load_args.file_name_;
if (OB_FAIL(SimpleDataSplitUtils::split(execute_param_.data_access_param_, data_desc,
execute_param_.parallel_, data_desc_iter))) {
execute_param_.thread_count_, data_desc_iter))) {
LOG_WARN("fail to split data", KR(ret));
}
} else {
@ -1914,9 +1914,10 @@ int ObLoadDataDirectImpl::init_execute_param()
hint_parallel = load_args.file_iter_.count() > 1
? MIN(hint_parallel, load_args.file_iter_.count())
: hint_parallel;
execute_param_.parallel_ = MIN(hint_parallel, (int64_t)tenant->unit_max_cpu());
execute_param_.parallel_ = hint_parallel;
execute_param_.thread_count_ = MIN(hint_parallel, (int64_t)tenant->unit_max_cpu());
execute_param_.data_mem_usage_limit_ =
MIN(execute_param_.parallel_ * 2, MAX_DATA_MEM_USAGE_LIMIT);
MIN(execute_param_.thread_count_ * 2, MAX_DATA_MEM_USAGE_LIMIT);
}
}
// batch_row_count_
@ -2077,7 +2078,8 @@ int ObLoadDataDirectImpl::init_execute_context()
ObTableLoadParam load_param;
load_param.tenant_id_ = execute_param_.tenant_id_;
load_param.table_id_ = execute_param_.table_id_;
load_param.session_count_ = execute_param_.parallel_;
load_param.parallel_ = execute_param_.parallel_;
load_param.session_count_ = execute_param_.thread_count_;
load_param.batch_size_ = execute_param_.batch_row_count_;
load_param.max_error_row_count_ = execute_param_.max_error_rows_;
load_param.column_count_ = execute_param_.store_column_idxs_.count();
@ -2129,7 +2131,7 @@ int ObLoadDataDirectImpl::init_logger()
session->get_tenant_name().length(), session->get_tenant_name().ptr(),
load_args.file_name_.length(), load_args.file_name_.ptr(),
load_args.combined_name_.length(), load_args.combined_name_.ptr(),
execute_param_.parallel_, execute_param_.batch_row_count_,
execute_param_.thread_count_, execute_param_.batch_row_count_,
ObCurTraceId::get_trace_id_str()));
OZ(databuff_printf(buf, buf_len, pos, "Start time:\t"));
OZ(ObTimeConverter::datetime_to_str(current_time, TZ_INFO(session), ObString(),

View File

@ -67,7 +67,7 @@ private:
public:
LoadExecuteParam();
bool is_valid() const;
TO_STRING_KV(K_(tenant_id), K_(database_id), K_(table_id), K_(combined_name), K_(parallel),
TO_STRING_KV(K_(tenant_id), K_(database_id), K_(table_id), K_(combined_name), K_(parallel), K_(thread_count),
K_(batch_row_count), K_(data_mem_usage_limit), K_(need_sort), K_(online_opt_stat_gather),
K_(max_error_rows), K_(ignore_row_num), K_(data_access_param), K_(store_column_idxs));
public:
@ -78,7 +78,8 @@ private:
common::ObString database_name_;
common::ObString table_name_;
common::ObString combined_name_; // database name + table name
int64_t parallel_; // number of concurrent threads
int64_t parallel_;
int64_t thread_count_; // number of concurrent threads
int64_t batch_row_count_;
int64_t data_mem_usage_limit_; // limit = data_mem_usage_limit * MAX_BUFFER_SIZE
bool need_sort_;

View File

@ -10,6 +10,7 @@
#include "observer/table_load/ob_table_load_struct.h"
#include "observer/table_load/ob_table_load_instance.h"
#include "share/schema/ob_schema_getter_guard.h"
#include "observer/omt/ob_tenant.h"
namespace oceanbase
{
@ -48,7 +49,10 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
load_exec_ctx_->allocator_ = &(exec_ctx->get_allocator());
uint64_t sql_mode = 0;
ObSEArray<int64_t, 16> store_column_idxs;
if (OB_FAIL(init_store_column_idxs(MTL_ID(), table_id, store_column_idxs))) {
omt::ObTenant *tenant = nullptr;
if (OB_FAIL(GCTX.omt_->get_tenant(MTL_ID(), tenant))) {
LOG_WARN("fail to get tenant handle", KR(ret), K(MTL_ID()));
} else if (OB_FAIL(init_store_column_idxs(MTL_ID(), table_id, store_column_idxs))) {
LOG_WARN("failed to init store column idxs", KR(ret));
} else if (OB_FAIL(exec_ctx->get_my_session()->get_sys_variable(SYS_VAR_SQL_MODE, sql_mode))) {
LOG_WARN("fail to get sys variable", KR(ret));
@ -58,7 +62,8 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
param.tenant_id_ = MTL_ID();
param.table_id_ = table_id;
param.batch_size_ = 100;
param.session_count_ = parallel;
param.parallel_ = parallel;
param.session_count_ = MIN(parallel, (int64_t)tenant->unit_max_cpu());
param.px_mode_ = true;
param.online_opt_stat_gather_ = false;
param.need_sort_ = true;