From 2597a7560180b2bdfdd62ffd5462a9902ca6b52a Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 7 Feb 2024 23:48:13 +0000 Subject: [PATCH] [CP] Fix direct load use non-thread-safe allocator to concurrent allocate memory --- src/sql/engine/cmd/ob_load_data_direct_impl.cpp | 8 +++++--- src/sql/engine/cmd/ob_load_data_direct_impl.h | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index b26951703d..3ea8d493b8 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -435,7 +435,8 @@ void ObLoadDataDirectImpl::DataBuffer::swap(DataBuffer &other) */ ObLoadDataDirectImpl::DataReader::DataReader() - : execute_ctx_(nullptr), + : allocator_("TLD_DataReader"), + execute_ctx_(nullptr), file_reader_(nullptr), end_offset_(0), read_raw_(false), @@ -461,6 +462,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa ret = OB_INIT_TWICE; LOG_WARN("ObLoadDataDirectImpl::DataReader init twice", KR(ret), KP(this)); } else { + allocator_.set_tenant_id(MTL_ID()); execute_ctx_ = &execute_ctx; read_raw_ = read_raw; if (OB_FAIL(csv_parser_.init(data_access_param.file_format_, data_access_param.file_column_num_, @@ -470,7 +472,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa if (OB_SUCC(ret) && !read_raw) { ObCSVFormats formats; formats.init(data_access_param.file_format_); - if (OB_FAIL(data_trimer_.init(*execute_ctx_->allocator_, formats))) { + if (OB_FAIL(data_trimer_.init(allocator_, formats))) { LOG_WARN("fail to init data trimer", KR(ret)); } } @@ -485,7 +487,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info(); file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts(); - if (OB_FAIL(ObFileReader::open(file_read_param, *execute_ctx_->allocator_, file_reader_))) { + if (OB_FAIL(ObFileReader::open(file_read_param, allocator_, file_reader_))) { LOG_WARN("failed to open file", KR(ret), K(data_desc)); } else if (file_reader_->seekable()) { diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 0cf7c5537a..7c6d6f4e86 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -215,6 +215,7 @@ private: int read_buffer(ObLoadFileBuffer &file_buffer); private: + ObArenaAllocator allocator_; LoadExecuteContext *execute_ctx_; ObCSVGeneralParser csv_parser_; // 用来计算完整行 ObLoadFileDataTrimer data_trimer_; // 缓存不完整行的数据