From 75ba775570a787fdaccd50bda4f1a9b785ede874 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Thu, 18 May 2023 00:16:53 +0000 Subject: [PATCH] Fix direct load csv parser thread count --- .../engine/cmd/ob_load_data_direct_impl.cpp | 84 +++++++++---------- src/sql/engine/cmd/ob_load_data_direct_impl.h | 21 +++-- 2 files changed, 52 insertions(+), 53 deletions(-) diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 5d0eea5cb0..504b9d6454 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -50,6 +50,7 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam() table_id_(OB_INVALID_ID), sql_mode_(0), parallel_(0), + thread_count_(0), batch_row_count_(0), data_mem_usage_limit_(0), need_sort_(false), @@ -64,7 +65,7 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const { return OB_INVALID_ID != tenant_id_ && OB_INVALID_ID != database_id_ && OB_INVALID_ID != table_id_ && !database_name_.empty() && !table_name_.empty() && - !combined_name_.empty() && parallel_ > 0 && batch_row_count_ > 0 && + !combined_name_.empty() && parallel_ > 0 && thread_count_ > 0 && batch_row_count_ > 0 && data_mem_usage_limit_ > 0 && max_error_rows_ >= 0 && ignore_row_num_ >= 0 && ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && data_access_param_.is_valid() && !store_column_idxs_.empty(); @@ -1002,6 +1003,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor() : execute_param_(nullptr), execute_ctx_(nullptr), task_scheduler_(nullptr), + worker_count_(0), worker_ctx_array_(nullptr), total_line_count_(0), is_inited_(false) @@ -1018,7 +1020,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor() task_scheduler_ = nullptr; } if (nullptr != worker_ctx_array_) { - for (int64_t i = 0; i < execute_param_->thread_count_; ++i) { + for (int64_t i = 0; i < worker_count_; ++i) { WorkerContext *worker_ctx = worker_ctx_array_ + i; worker_ctx->~WorkerContext(); } @@ -1035,11 +1037,12 @@ ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor() int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx, - int64_t handle_count) + int64_t worker_count, int64_t handle_count) { int ret = OB_SUCCESS; execute_param_ = &execute_param; execute_ctx_ = &execute_ctx; + worker_count_ = worker_count; // init task_allocator_ if (OB_FAIL(task_allocator_.init("TLD_TaskPool", execute_param_->tenant_id_))) { LOG_WARN("fail to init allocator", KR(ret)); @@ -1047,7 +1050,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::inner_init(const LoadExecuteParam &e // init task_scheduler_ else if (OB_ISNULL(task_scheduler_ = OB_NEWx(ObTableLoadTaskThreadPoolScheduler, (execute_ctx_->allocator_), - execute_param_->thread_count_, *execute_ctx_->allocator_))) { + worker_count_, *execute_ctx_->allocator_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new ObTableLoadTaskThreadPoolScheduler", KR(ret)); } else if (OB_FAIL(task_scheduler_->init())) { @@ -1091,13 +1094,12 @@ int ObLoadDataDirectImpl::FileLoadExecutor::init_worker_ctx_array() { int ret = OB_SUCCESS; void *buf = nullptr; - if (OB_ISNULL( - buf = execute_ctx_->allocator_->alloc(sizeof(WorkerContext) * execute_param_->thread_count_))) { + if (OB_ISNULL(buf = execute_ctx_->allocator_->alloc(sizeof(WorkerContext) * worker_count_))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate memory", KR(ret)); } else { - worker_ctx_array_ = new (buf) WorkerContext[execute_param_->thread_count_]; - for (int64_t i = 0; OB_SUCC(ret) && i < execute_param_->thread_count_; ++i) { + worker_ctx_array_ = new (buf) WorkerContext[worker_count_]; + for (int64_t i = 0; OB_SUCC(ret) && i < worker_count_; ++i) { WorkerContext *worker_ctx = worker_ctx_array_ + i; if (OB_FAIL(worker_ctx->data_parser_.init(execute_param_->data_access_param_, *execute_ctx_->logger_))) { @@ -1139,8 +1141,8 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute() LOG_WARN("fail to alloc task", KR(ret)); } else if (OB_FAIL(fill_task(handle, task))) { LOG_WARN("fail to fill task", KR(ret)); - } else if (OB_FAIL(task_scheduler_->add_task(handle->session_id_ - 1, task))) { - LOG_WARN("fail to add task", KR(ret), K(handle->session_id_), KPC(task)); + } else if (OB_FAIL(task_scheduler_->add_task(handle->worker_idx_, task))) { + LOG_WARN("fail to add task", KR(ret), K(handle->worker_idx_), KPC(task)); } if (OB_FAIL(ret)) { if (nullptr != task) { @@ -1284,20 +1286,19 @@ void ObLoadDataDirectImpl::FileLoadExecutor::wait_all_task_finished() task_controller_.wait_all_task_finish(execute_param_->combined_name_.ptr(), THIS_WORKER.get_timeout_ts()); } -int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(int64_t worker_idx, - TaskHandle *handle, +int ObLoadDataDirectImpl::FileLoadExecutor::process_task_handle(TaskHandle *handle, int64_t &parsed_line_count) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObLoadDataDirectImpl::FileLoadExecutor not init", KR(ret), KP(this)); - } else if (OB_UNLIKELY(worker_idx < 0 || worker_idx >= execute_param_->thread_count_ || - nullptr == handle)) { + } else if (OB_UNLIKELY(nullptr == handle || handle->worker_idx_ < 0 || + handle->worker_idx_ >= worker_count_)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), KPC(execute_param_), K(worker_idx), KP(handle)); + LOG_WARN("invalid args", KR(ret), KPC(execute_param_), KP(handle)); } else { - WorkerContext &worker_ctx = worker_ctx_array_[worker_idx]; + WorkerContext &worker_ctx = worker_ctx_array_[handle->worker_idx_]; const int64_t column_count = execute_param_->data_access_param_.file_column_num_; const int64_t data_buffer_length = handle->data_buffer_.get_data_length(); int64_t parsed_bytes = 0; @@ -1400,11 +1401,8 @@ class ObLoadDataDirectImpl::LargeFileLoadTaskProcessor : public ObITableLoadTask { public: LargeFileLoadTaskProcessor(ObTableLoadTask &task, FileLoadExecutor *file_load_executor, - int64_t worker_idx, TaskHandle *handle) - : ObITableLoadTaskProcessor(task), - file_load_executor_(file_load_executor), - worker_idx_(worker_idx), - handle_(handle) + TaskHandle *handle) + : ObITableLoadTaskProcessor(task), file_load_executor_(file_load_executor), handle_(handle) { } virtual ~LargeFileLoadTaskProcessor() = default; @@ -1412,7 +1410,6 @@ public: INHERIT_TO_STRING_KV("task_processor", ObITableLoadTaskProcessor, KPC_(handle)); private: FileLoadExecutor *file_load_executor_; - int64_t worker_idx_; TaskHandle *handle_; }; @@ -1421,7 +1418,7 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process() int ret = OB_SUCCESS; handle_->result_.start_process_ts_ = ObTimeUtil::current_time(); int64_t line_count = 0; - if (OB_FAIL(file_load_executor_->process_task_handle(worker_idx_, handle_, line_count))) { + if (OB_FAIL(file_load_executor_->process_task_handle(handle_, line_count))) { LOG_WARN("fail to process task handle", KR(ret)); } return ret; @@ -1432,7 +1429,7 @@ int ObLoadDataDirectImpl::LargeFileLoadTaskProcessor::process() */ ObLoadDataDirectImpl::LargeFileLoadExecutor::LargeFileLoadExecutor() - : next_session_id_(1) + : next_worker_idx_(0) { } @@ -1455,7 +1452,8 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::init(const LoadExecuteParam &ex } else { DataDescIterator copy_data_desc_iter; DataDesc data_desc; - if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.data_mem_usage_limit_))) { + if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.thread_count_, + execute_param.data_mem_usage_limit_))) { LOG_WARN("fail to init inner", KR(ret)); } // data_desc_ @@ -1502,7 +1500,8 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::get_next_task_handle(TaskHandle LOG_WARN("fail to fetch task handle", KR(ret)); } else { handle->task_id_ = task_controller_.get_next_task_id(); - handle->session_id_ = get_session_id(); + handle->worker_idx_ = get_worker_idx(); + handle->session_id_ = handle->worker_idx_ + 1; handle->data_desc_ = data_desc_; handle->start_line_no_ = total_line_count_ + 1; handle->result_.created_ts_ = ObTimeUtil::current_time(); @@ -1516,8 +1515,7 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::fill_task(TaskHandle *handle, ObTableLoadTask *task) { int ret = OB_SUCCESS; - if (OB_FAIL( - task->set_processor(this, handle->session_id_ - 1, handle))) { + if (OB_FAIL(task->set_processor(this, handle))) { LOG_WARN("fail to set large file load task processor", KR(ret)); } else if (OB_FAIL(task->set_callback(this, handle))) { LOG_WARN("fail to set file load task callback", KR(ret)); @@ -1525,12 +1523,12 @@ int ObLoadDataDirectImpl::LargeFileLoadExecutor::fill_task(TaskHandle *handle, return ret; } -int32_t ObLoadDataDirectImpl::LargeFileLoadExecutor::get_session_id() +int64_t ObLoadDataDirectImpl::LargeFileLoadExecutor::get_worker_idx() { - if (next_session_id_ > execute_param_->thread_count_) { - next_session_id_ = 1; + if (next_worker_idx_ >= worker_count_) { + next_worker_idx_ = 0; } - return next_session_id_++; + return next_worker_idx_++; } int ObLoadDataDirectImpl::LargeFileLoadExecutor::skip_ignore_rows() @@ -1575,12 +1573,11 @@ class ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor : public ObITableLoadTas public: MultiFilesLoadTaskProcessor(ObTableLoadTask &task, const LoadExecuteParam *execute_param, LoadExecuteContext *execute_ctx, FileLoadExecutor *file_load_executor, - int64_t worker_idx, TaskHandle *handle) + TaskHandle *handle) : ObITableLoadTaskProcessor(task), execute_param_(execute_param), execute_ctx_(execute_ctx), file_load_executor_(file_load_executor), - worker_idx_(worker_idx), handle_(handle) { } @@ -1593,7 +1590,6 @@ private: const LoadExecuteParam *execute_param_; LoadExecuteContext *execute_ctx_; FileLoadExecutor *file_load_executor_; - int64_t worker_idx_; TaskHandle *handle_; DataReader data_reader_; }; @@ -1617,8 +1613,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process() handle_->data_buffer_.is_end_file_ = data_reader_.is_end_file(); handle_->start_line_no_ = handle_->result_.parsed_row_count_ + 1; current_line_count = 0; - if (OB_FAIL( - file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) { + if (OB_FAIL(file_load_executor_->process_task_handle(handle_, current_line_count))) { LOG_WARN("fail to process task handle", KR(ret)); } } @@ -1637,8 +1632,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process() handle_->data_buffer_.is_end_file_ = data_reader_.is_end_file(); handle_->start_line_no_ = handle_->result_.parsed_row_count_ + 1; current_line_count = 0; - if (OB_FAIL( - file_load_executor_->process_task_handle(worker_idx_, handle_, current_line_count))) { + if (OB_FAIL(file_load_executor_->process_task_handle(handle_, current_line_count))) { LOG_WARN("fail to process task handle", KR(ret)); } else if (OB_UNLIKELY(0 == current_line_count)) { ret = OB_NOT_SUPPORTED; @@ -1718,7 +1712,8 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::init(const LoadExecuteParam &e ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(execute_param), K(execute_ctx), K(data_desc_iter)); } else { - if (OB_FAIL(inner_init(execute_param, execute_ctx, execute_param.parallel_))) { + const int64_t parse_thread_count = MIN(data_desc_iter.count(), execute_param.thread_count_); + if (OB_FAIL(inner_init(execute_param, execute_ctx, parse_thread_count, parse_thread_count))) { LOG_WARN("fail to init inner", KR(ret)); } else if (OB_FAIL(data_desc_iter_.copy(data_desc_iter))) { LOG_WARN("fail to copy data desc iter", KR(ret)); @@ -1733,7 +1728,9 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::prepare_execute() { int ret = OB_SUCCESS; for (int64_t i = 0; i < handle_resource_.count(); ++i) { - handle_resource_.at(i)->session_id_ = i + 1; + TaskHandle *task_handle = handle_resource_.at(i); + task_handle->worker_idx_ = i; + task_handle->session_id_ = i + 1; } return ret; } @@ -1762,7 +1759,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadExecutor::fill_task(TaskHandle *handle, { int ret = OB_SUCCESS; if (OB_FAIL(task->set_processor(execute_param_, execute_ctx_, this, - handle->session_id_ - 1, handle))) { + handle))) { LOG_WARN("fail to set multi files load task processor", KR(ret)); } else if (OB_FAIL(task->set_callback(this, handle))) { LOG_WARN("fail to set file load task callback", KR(ret)); @@ -1911,9 +1908,6 @@ int ObLoadDataDirectImpl::init_execute_param() LOG_WARN("fail to get tenant handle", KR(ret), K(execute_param_.tenant_id_)); } else { hint_parallel = hint_parallel > 0 ? hint_parallel : DEFAULT_PARALLEL_THREAD_COUNT; - hint_parallel = load_args.file_iter_.count() > 1 - ? MIN(hint_parallel, load_args.file_iter_.count()) - : hint_parallel; execute_param_.parallel_ = hint_parallel; execute_param_.thread_count_ = MIN(hint_parallel, (int64_t)tenant->unit_max_cpu()); execute_param_.data_mem_usage_limit_ = diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index ee9cdcbb04..af41c91a02 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -335,15 +335,19 @@ private: struct TaskHandle { - TaskHandle() : task_id_(common::OB_INVALID_ID), session_id_(0), start_line_no_(0) {} + TaskHandle() + : task_id_(common::OB_INVALID_ID), worker_idx_(-1), session_id_(0), start_line_no_(0) + { + } int64_t task_id_; DataBuffer data_buffer_; - int32_t session_id_; + int64_t worker_idx_; // parse thread idx + int32_t session_id_; // table load session id DataDesc data_desc_; int64_t start_line_no_; // 从1开始 TaskResult result_; - TO_STRING_KV(K_(task_id), K_(data_buffer), K_(session_id), K_(data_desc), K_(start_line_no), - K_(result)); + TO_STRING_KV(K_(task_id), K_(data_buffer), K_(worker_idx), K_(session_id), K_(data_desc), + K_(start_line_no), K_(result)); private: DISALLOW_COPY_AND_ASSIGN(TaskHandle); }; @@ -359,7 +363,7 @@ private: int alloc_task(observer::ObTableLoadTask *&task); void free_task(observer::ObTableLoadTask *task); void task_finished(TaskHandle *handle); - int process_task_handle(int64_t worker_idx, TaskHandle *handle, int64_t &line_count); + int process_task_handle(TaskHandle *handle, int64_t &line_count); int64_t get_total_line_count() const {return total_line_count_; } protected: virtual int prepare_execute() = 0; @@ -367,7 +371,7 @@ private: virtual int fill_task(TaskHandle *handle, observer::ObTableLoadTask *task) = 0; protected: int inner_init(const LoadExecuteParam &execute_param, LoadExecuteContext &execute_ctx, - int64_t handle_count); + int64_t worker_count, int64_t handle_count); int init_worker_ctx_array(); int fetch_task_handle(TaskHandle *&handle); int handle_task_result(int64_t task_id, TaskResult &result); @@ -385,6 +389,7 @@ private: LoadExecuteContext *execute_ctx_; observer::ObTableLoadObjectAllocator task_allocator_; observer::ObITableLoadTaskScheduler *task_scheduler_; + int64_t worker_count_; // <= thread_count_ WorkerContext *worker_ctx_array_; // task ctrl ObParallelTaskController task_controller_; @@ -415,13 +420,13 @@ private: int get_next_task_handle(TaskHandle *&handle) override; int fill_task(TaskHandle *handle, observer::ObTableLoadTask *task) override; private: - int32_t get_session_id(); + int64_t get_worker_idx(); int skip_ignore_rows(); private: DataDesc data_desc_; DataBuffer expr_buffer_; DataReader data_reader_; - int32_t next_session_id_; + int64_t next_worker_idx_; DISALLOW_COPY_AND_ASSIGN(LargeFileLoadExecutor); };