[CP] Fix direct load use non-thread-safe allocator to concurrent allocate memory
This commit is contained in:
@ -435,7 +435,8 @@ void ObLoadDataDirectImpl::DataBuffer::swap(DataBuffer &other)
|
||||
*/
|
||||
|
||||
ObLoadDataDirectImpl::DataReader::DataReader()
|
||||
: execute_ctx_(nullptr),
|
||||
: allocator_("TLD_DataReader"),
|
||||
execute_ctx_(nullptr),
|
||||
file_reader_(nullptr),
|
||||
end_offset_(0),
|
||||
read_raw_(false),
|
||||
@ -461,6 +462,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObLoadDataDirectImpl::DataReader init twice", KR(ret), KP(this));
|
||||
} else {
|
||||
allocator_.set_tenant_id(MTL_ID());
|
||||
execute_ctx_ = &execute_ctx;
|
||||
read_raw_ = read_raw;
|
||||
if (OB_FAIL(csv_parser_.init(data_access_param.file_format_, data_access_param.file_column_num_,
|
||||
@ -470,7 +472,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
||||
if (OB_SUCC(ret) && !read_raw) {
|
||||
ObCSVFormats formats;
|
||||
formats.init(data_access_param.file_format_);
|
||||
if (OB_FAIL(data_trimer_.init(*execute_ctx_->allocator_, formats))) {
|
||||
if (OB_FAIL(data_trimer_.init(allocator_, formats))) {
|
||||
LOG_WARN("fail to init data trimer", KR(ret));
|
||||
}
|
||||
}
|
||||
@ -485,7 +487,7 @@ int ObLoadDataDirectImpl::DataReader::init(const DataAccessParam &data_access_pa
|
||||
file_read_param.session_ = execute_ctx.exec_ctx_.get_session_info();
|
||||
file_read_param.timeout_ts_ = THIS_WORKER.get_timeout_ts();
|
||||
|
||||
if (OB_FAIL(ObFileReader::open(file_read_param, *execute_ctx_->allocator_, file_reader_))) {
|
||||
if (OB_FAIL(ObFileReader::open(file_read_param, allocator_, file_reader_))) {
|
||||
LOG_WARN("failed to open file", KR(ret), K(data_desc));
|
||||
} else if (file_reader_->seekable()) {
|
||||
|
||||
|
@ -215,6 +215,7 @@ private:
|
||||
int read_buffer(ObLoadFileBuffer &file_buffer);
|
||||
|
||||
private:
|
||||
ObArenaAllocator allocator_;
|
||||
LoadExecuteContext *execute_ctx_;
|
||||
ObCSVGeneralParser csv_parser_; // 用来计算完整行
|
||||
ObLoadFileDataTrimer data_trimer_; // 缓存不完整行的数据
|
||||
|
Reference in New Issue
Block a user