From 1c6afa2162ee30dd024abd20f40ac2ca2ccd3e17 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Wed, 24 May 2023 08:41:14 +0000 Subject: [PATCH] Fix direct load allocator tenant --- .../table_load/ob_table_load_begin_processor.cpp | 1 + .../table_load/ob_table_load_begin_processor.h | 5 ++++- src/observer/table_load/ob_table_load_csv_parser.cpp | 1 + .../table_load/ob_table_load_finish_processor.h | 5 ++++- src/observer/table_load/ob_table_load_merger.cpp | 1 - src/observer/table_load/ob_table_load_merger.h | 1 - .../ob_table_load_multiple_heap_table_compactor.cpp | 4 +++- .../table_load/ob_table_load_parallel_merge_ctx.cpp | 3 +++ .../table_load/ob_table_load_partition_location.cpp | 1 + src/observer/table_load/ob_table_load_processor.h | 10 ++++++++-- src/share/table/ob_table_load_define.h | 3 ++- src/share/table/ob_table_load_handle.h | 4 +++- src/sql/engine/cmd/ob_load_data_direct_impl.cpp | 4 ++-- src/storage/direct_load/ob_direct_load_mem_dump.cpp | 1 + ...irect_load_multiple_heap_table_index_scan_merge.cpp | 4 +++- .../ob_direct_load_multiple_heap_table_sorter.cpp | 2 ++ .../ob_direct_load_multiple_sstable_compactor.cpp | 4 ++++ .../ob_direct_load_multiple_sstable_scan_merge.cpp | 4 +++- .../ob_direct_load_multiple_sstable_scanner.cpp | 4 +++- .../ob_direct_load_partition_merge_task.cpp | 5 ++++- .../direct_load/ob_direct_load_range_splitter.cpp | 4 ++++ .../direct_load/ob_direct_load_sstable_builder.cpp | 1 + .../direct_load/ob_direct_load_sstable_compactor.cpp | 9 ++++++++- .../direct_load/ob_direct_load_sstable_scan_merge.cpp | 4 +++- 24 files changed, 68 insertions(+), 17 deletions(-) diff --git a/src/observer/table_load/ob_table_load_begin_processor.cpp b/src/observer/table_load/ob_table_load_begin_processor.cpp index a53245da39..e6d6c357f2 100644 --- a/src/observer/table_load/ob_table_load_begin_processor.cpp +++ b/src/observer/table_load/ob_table_load_begin_processor.cpp @@ -28,6 +28,7 @@ using namespace omt; ObTableLoadBeginP::ObTableLoadBeginP(const ObGlobalContext &gctx) : gctx_(gctx), table_ctx_(nullptr) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadBeginP::~ObTableLoadBeginP() diff --git a/src/observer/table_load/ob_table_load_begin_processor.h b/src/observer/table_load/ob_table_load_begin_processor.h index f0e1e6d57c..42838bd0b5 100644 --- a/src/observer/table_load/ob_table_load_begin_processor.h +++ b/src/observer/table_load/ob_table_load_begin_processor.h @@ -45,7 +45,10 @@ class ObTableLoadPreBeginPeerP : public obrpc::ObRpcProcessor > ParentType; public: - explicit ObTableLoadPreBeginPeerP(const ObGlobalContext &gctx) : gctx_(gctx) {} + explicit ObTableLoadPreBeginPeerP(const ObGlobalContext &gctx) : gctx_(gctx) + { + allocator_.set_tenant_id(MTL_ID()); + } virtual ~ObTableLoadPreBeginPeerP() = default; protected: diff --git a/src/observer/table_load/ob_table_load_csv_parser.cpp b/src/observer/table_load/ob_table_load_csv_parser.cpp index 321e861b2a..e58fad2a04 100644 --- a/src/observer/table_load/ob_table_load_csv_parser.cpp +++ b/src/observer/table_load/ob_table_load_csv_parser.cpp @@ -46,6 +46,7 @@ int ObTableLoadCSVParser::init(ObTableLoadTableCtx *table_ctx, const ObString &d LOG_WARN("invalid args", KR(ret), KP(table_ctx), K(data_buffer.length()), K(table_ctx->param_.data_type_)); } else { + allocator_.set_tenant_id(MTL_ID()); column_count_ = table_ctx->param_.column_count_; batch_row_count_ = table_ctx->param_.batch_size_; const int64_t total_obj_count = batch_row_count_ * column_count_; diff --git a/src/observer/table_load/ob_table_load_finish_processor.h b/src/observer/table_load/ob_table_load_finish_processor.h index 49afd16a4e..425ba7761d 100644 --- a/src/observer/table_load/ob_table_load_finish_processor.h +++ b/src/observer/table_load/ob_table_load_finish_processor.h @@ -36,7 +36,10 @@ class ObTableLoadPreMergePeerP : public obrpc::ObRpcProcessor > ParentType; public: - explicit ObTableLoadPreMergePeerP(const ObGlobalContext &gctx) : gctx_(gctx) {} + explicit ObTableLoadPreMergePeerP(const ObGlobalContext &gctx) : gctx_(gctx) + { + allocator_.set_tenant_id(MTL_ID()); + } virtual ~ObTableLoadPreMergePeerP() = default; protected: diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 290c027c5c..9027d51fda 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -104,7 +104,6 @@ private: ObTableLoadMerger::ObTableLoadMerger(ObTableLoadStoreCtx *store_ctx) : store_ctx_(store_ctx), param_(store_ctx->ctx_->param_), - allocator_("TLD_TLdMerge"), running_thread_count_(0), has_error_(false), is_stop_(false), diff --git a/src/observer/table_load/ob_table_load_merger.h b/src/observer/table_load/ob_table_load_merger.h index 996e7ee186..ecb507d02e 100644 --- a/src/observer/table_load/ob_table_load_merger.h +++ b/src/observer/table_load/ob_table_load_merger.h @@ -44,7 +44,6 @@ private: mutable lib::ObMutex mutex_; ObDirectLoadMergeTaskIterator merge_task_iter_; common::ObDList merging_list_; - common::ObArenaAllocator allocator_; int64_t running_thread_count_ CACHE_ALIGNED; volatile bool has_error_; volatile bool is_stop_; diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index c01c5b22e2..4327762845 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -72,9 +72,11 @@ public: ctx_(ctx), mem_ctx_(mem_ctx), index_dir_id_(-1), - data_dir_id_(-1) + data_dir_id_(-1), + heap_table_allocator_("TLD_MHTCompact") { ctx_->inc_ref_count(); + heap_table_allocator_.set_tenant_id(MTL_ID()); } virtual ~CompactTaskProcessor() { diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp index 4cbca67acb..3b4aa77d80 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp @@ -67,6 +67,8 @@ ObTableLoadParallelMergeTabletCtx::ObTableLoadParallelMergeTabletCtx() range_sstable_count_(0), range_allocator_("TLD_ParalMerge") { + allocator_.set_tenant_id(MTL_ID()); + range_allocator_.set_tenant_id(MTL_ID()); } ObTableLoadParallelMergeTabletCtx::~ObTableLoadParallelMergeTabletCtx() @@ -528,6 +530,7 @@ ObTableLoadParallelMergeCtx::ObTableLoadParallelMergeCtx() is_stop_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadParallelMergeCtx::~ObTableLoadParallelMergeCtx() diff --git a/src/observer/table_load/ob_table_load_partition_location.cpp b/src/observer/table_load/ob_table_load_partition_location.cpp index d3836689a7..7f56da4907 100644 --- a/src/observer/table_load/ob_table_load_partition_location.cpp +++ b/src/observer/table_load/ob_table_load_partition_location.cpp @@ -219,6 +219,7 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) ObHashMap *> addr_map; ObHashMap *>::const_iterator addr_iter; int64_t pos = 0; + tmp_allocator.set_tenant_id(MTL_ID()); // 将所有addr存到set中 if (OB_FAIL(addr_map.create(64, "TLD_PL_Tmp", "TLD_PL_Tmp"))) { LOG_WARN("fail to create hashmap", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_processor.h b/src/observer/table_load/ob_table_load_processor.h index f3e2c4a0db..4c0d887b3a 100644 --- a/src/observer/table_load/ob_table_load_processor.h +++ b/src/observer/table_load/ob_table_load_processor.h @@ -18,7 +18,10 @@ class ObTableLoadP : public obrpc::ObRpcProcessor > ParentType; public: - explicit ObTableLoadP(const ObGlobalContext &gctx) : gctx_(gctx), allocator_("TLD_LoadP") {} + explicit ObTableLoadP(const ObGlobalContext &gctx) : gctx_(gctx), allocator_("TLD_LoadP") + { + allocator_.set_tenant_id(MTL_ID()); + } virtual ~ObTableLoadP() = default; protected: @@ -39,7 +42,10 @@ class ObTableLoadPeerP : public obrpc::ObRpcProcessor > ParentType; public: - explicit ObTableLoadPeerP(const ObGlobalContext &gctx) : gctx_(gctx), allocator_("TLD_PLoadP") {} + explicit ObTableLoadPeerP(const ObGlobalContext &gctx) : gctx_(gctx), allocator_("TLD_PLoadP") + { + allocator_.set_tenant_id(MTL_ID()); + } virtual ~ObTableLoadPeerP() = default; protected: diff --git a/src/share/table/ob_table_load_define.h b/src/share/table/ob_table_load_define.h index cf3024bde2..f6e2a6a189 100644 --- a/src/share/table/ob_table_load_define.h +++ b/src/share/table/ob_table_load_define.h @@ -10,6 +10,7 @@ #include "share/ob_ls_id.h" #include "lib/oblog/ob_log_module.h" #include "lib/utility/ob_print_utils.h" +#include "share/rc/ob_tenant_base.h" #include "share/stat/ob_opt_table_stat.h" #include "share/stat/ob_opt_column_stat.h" #include "share/stat/ob_opt_osg_column_stat.h" @@ -412,7 +413,7 @@ struct ObTableLoadSqlStatistics { OB_UNIS_VERSION(1); public: - ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") {} + ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); } ~ObTableLoadSqlStatistics() { reset();} void reset() { for (int64_t i = 0; i < col_stat_array_.count(); ++i) { diff --git a/src/share/table/ob_table_load_handle.h b/src/share/table/ob_table_load_handle.h index 68478ab2f7..5ea1e80092 100644 --- a/src/share/table/ob_table_load_handle.h +++ b/src/share/table/ob_table_load_handle.h @@ -6,6 +6,7 @@ #define OB_TABLE_LOAD_HANDLE_H_ #include "lib/allocator/ob_malloc.h" +#include "share/rc/ob_tenant_base.h" namespace oceanbase { @@ -38,8 +39,9 @@ public: static ObTableLoadHandle make_handle(Args... args) { + ObMemAttr attr(MTL_ID(), "TLD_Handle"); ObTableLoadHandle handle; - handle.ptr_ = OB_NEW(Object, "TLD_handle", args...); + handle.ptr_ = OB_NEW(Object, attr, args...); handle.ptr_->ref_count_ = 1; return handle; } diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 3ccb767a9d..df04de01f4 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -123,8 +123,8 @@ int ObLoadDataDirectImpl::Logger::init(const ObString &load_info, int64_t max_er } else if (OB_UNLIKELY(load_info.empty())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(load_info)); - } else if (OB_ISNULL( - buf_ = static_cast(ob_malloc(DEFAULT_BUF_LENGTH, ObModIds::OB_SQL_LOAD_DATA)))) { + } else if (OB_ISNULL(buf_ = static_cast( + ob_malloc(DEFAULT_BUF_LENGTH, ObMemAttr(MTL_ID(), "MTL_LogBuffer"))))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate memory", KR(ret)); } else { diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index 8a046e42d4..4943a11ba5 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -30,6 +30,7 @@ ObDirectLoadMemDump::Context::Context() finished_sub_dump_count_(0), sub_dump_count_(0) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMemDump::Context::~Context() diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp index 22351cac4f..fa95d9d204 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp @@ -14,7 +14,8 @@ namespace storage using namespace common; ObDirectLoadMultipleHeapTableIndexScanMerge::ObDirectLoadMultipleHeapTableIndexScanMerge() - : scanners_(nullptr), + : allocator_("TLD_ScanMerge"), + scanners_(nullptr), consumers_(nullptr), consumer_cnt_(0), simple_merge_(nullptr), @@ -39,6 +40,7 @@ int ObDirectLoadMultipleHeapTableIndexScanMerge::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(scanners.count())); } else { + allocator_.set_tenant_id(MTL_ID()); if (scanners.count() > 1) { // init consumers if (OB_ISNULL(consumers_ = static_cast( diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp index 11efa474c9..33bc7b3ad1 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp @@ -21,6 +21,7 @@ using namespace blocksstable; ObDirectLoadMultipleHeapTableSorter::ObDirectLoadMultipleHeapTableSorter( ObDirectLoadMemContext *mem_ctx) : mem_ctx_(mem_ctx), + allocator_("TLD_Sorter"), extra_buf_(nullptr), index_dir_id_(-1), data_dir_id_(-1), @@ -36,6 +37,7 @@ ObDirectLoadMultipleHeapTableSorter::~ObDirectLoadMultipleHeapTableSorter() int ObDirectLoadMultipleHeapTableSorter::init() { int ret = OB_SUCCESS; + allocator_.set_tenant_id(MTL_ID()); if (OB_ISNULL(extra_buf_ = static_cast(allocator_.alloc(mem_ctx_->table_data_desc_.extra_buf_size_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate extra buf", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp index 8204a8faee..53993b7bde 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp @@ -40,6 +40,8 @@ ObDirectLoadMultipleSSTableCompactor::ObDirectLoadMultipleSSTableCompactor() data_block_count_(0), row_count_(0), max_data_block_size_(0), + start_key_allocator_("TLD_SRowkey"), + end_key_allocator_("TLD_ERowkey"), is_inited_(false) { } @@ -59,6 +61,8 @@ int ObDirectLoadMultipleSSTableCompactor::init(const ObDirectLoadMultipleSSTable LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; + start_key_allocator_.set_tenant_id(MTL_ID()); + end_key_allocator_.set_tenant_id(MTL_ID()); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); is_inited_ = true; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp index 05a6c30469..80af0f7136 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp @@ -39,7 +39,8 @@ bool ObDirectLoadMultipleSSTableScanMergeParam::is_valid() const */ ObDirectLoadMultipleSSTableScanMerge::ObDirectLoadMultipleSSTableScanMerge() - : datum_utils_(nullptr), + : allocator_("TLD_ScanMerge"), + datum_utils_(nullptr), dml_row_handler_(nullptr), range_(nullptr), consumers_(nullptr), @@ -92,6 +93,7 @@ int ObDirectLoadMultipleSSTableScanMerge::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(sstable_array), K(range)); } else { + allocator_.set_tenant_id(MTL_ID()); // construct scanners for (int64_t i = 0; OB_SUCC(ret) && i < sstable_array.count(); ++i) { ObDirectLoadMultipleSSTable *sstable = sstable_array.at(i); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp index 4dfb234077..891a9d6d52 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp @@ -16,7 +16,8 @@ using namespace common; using namespace blocksstable; ObDirectLoadMultipleSSTableScanner::ObDirectLoadMultipleSSTableScanner() - : sstable_(nullptr), + : allocator_("TLD_Scanner"), + sstable_(nullptr), range_(nullptr), datum_utils_(nullptr), is_iter_start_(false), @@ -48,6 +49,7 @@ int ObDirectLoadMultipleSSTableScanner::init(ObDirectLoadMultipleSSTable *sstabl table_data_desc_ = table_data_desc; range_ = ⦥ datum_utils_ = datum_utils; + allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(data_block_scanner_.init(sstable, table_data_desc, range, datum_utils))) { LOG_WARN("fail to init data block scanner", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 1dcdb05298..48ba4c39cd 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -36,6 +36,7 @@ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask() is_stop_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask() @@ -825,7 +826,8 @@ int ObDirectLoadPartitionHeapTableMultipleMergeTask::construct_row_iter( */ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::RowIterator() - : origin_iter_(nullptr), + : allocator_("TLD_RowIter"), + origin_iter_(nullptr), rowkey_column_num_(0), store_column_count_(0), heap_table_array_(nullptr), @@ -862,6 +864,7 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::init( LOG_WARN("invalid args", KR(ret), K(merge_param), K(tablet_id), KP(origin_table), KP(heap_table_array)); } else { + allocator_.set_tenant_id(MTL_ID()); range_.set_whole_range(); if (OB_FAIL(origin_table->scan(range_, allocator_, origin_iter_))) { LOG_WARN("fail to scan origin table", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_range_splitter.cpp b/src/storage/direct_load/ob_direct_load_range_splitter.cpp index 8b180d914e..bb033370bb 100644 --- a/src/storage/direct_load/ob_direct_load_range_splitter.cpp +++ b/src/storage/direct_load/ob_direct_load_range_splitter.cpp @@ -368,6 +368,7 @@ int ObDirectLoadSSTableRangeSplitter::init(const ObIArray ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(sstable_array), KP(datum_utils)); } else { + allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(construct_rowkey_iters(sstable_array))) { LOG_WARN("fail to construct rowkey itres", KR(ret)); } else if (OB_FAIL( @@ -452,6 +453,7 @@ int ObDirectLoadMergeRangeSplitter::init(ObDirectLoadOriginTable *origin_table, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KPC(origin_table), K(sstable_array), KP(datum_utils)); } else { + allocator_.set_tenant_id(MTL_ID()); scan_range_.set_whole_range(); if (OB_FAIL(construct_origin_table_rowkey_iter(origin_table))) { LOG_WARN("fail to construct origin sstable rowkey itre", KR(ret)); @@ -567,6 +569,7 @@ int ObDirectLoadMultipleMergeTabletRangeSplitter::init( K(table_data_desc), KP(datum_utils)); } else { tablet_id_ = tablet_id; + allocator_.set_tenant_id(MTL_ID()); scan_range_.set_whole_range(); if (OB_FAIL(construct_origin_table_rowkey_iter(origin_table))) { LOG_WARN("fail to construct origin sstable rowkey itre", KR(ret)); @@ -1036,6 +1039,7 @@ int ObDirectLoadMultipleSSTableRangeSplitter::init( LOG_WARN("invalid args", KR(ret), K(sstable_array), K(table_data_desc), KP(datum_utils)); } else { datum_utils_ = datum_utils; + allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(construct_rowkey_iters(sstable_array, table_data_desc, datum_utils))) { LOG_WARN("fail to construct rowkey iters", KR(ret)); } else if (OB_FAIL(compare_.init(*datum_utils))) { diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp index 1883dab93d..53caab3220 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp @@ -47,6 +47,7 @@ int ObDirectLoadSSTableBuilder::init(const ObDirectLoadSSTableBuildParam ¶m) const uint64_t tenant_id = MTL_ID(); param_ = param; allocator_.set_tenant_id(tenant_id); + rowkey_allocator_.set_tenant_id(tenant_id); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); int64_t dir_id = -1; diff --git a/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp b/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp index cb4b2387a5..dd0b16de49 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp @@ -36,7 +36,12 @@ bool ObDirectLoadSSTableCompactParam::is_valid() const */ ObDirectLoadSSTableCompactor::ObDirectLoadSSTableCompactor() - : index_item_count_(0), index_block_count_(0), row_count_(0), is_inited_(false) + : index_item_count_(0), + index_block_count_(0), + row_count_(0), + start_key_allocator_("TLD_SRowkey"), + end_key_allocator_("TLD_ERowkey"), + is_inited_(false) { } @@ -55,6 +60,8 @@ int ObDirectLoadSSTableCompactor::init(const ObDirectLoadSSTableCompactParam &pa LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; + start_key_allocator_.set_tenant_id(MTL_ID()); + end_key_allocator_.set_tenant_id(MTL_ID()); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); is_inited_ = true; diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp index 345d124313..32344244b8 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp @@ -41,7 +41,8 @@ bool ObDirectLoadSSTableScanMergeParam::is_valid() const */ ObDirectLoadSSTableScanMerge::ObDirectLoadSSTableScanMerge() - : datum_utils_(nullptr), + : allocator_("TLD_ScanMerge"), + datum_utils_(nullptr), dml_row_handler_(nullptr), range_(nullptr), consumers_(nullptr), @@ -97,6 +98,7 @@ int ObDirectLoadSSTableScanMerge::init(const ObDirectLoadSSTableScanMergeParam & ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(sstable_array), K(range)); } else { + allocator_.set_tenant_id(MTL_ID()); // construct scanners for (int64_t i = 0; OB_SUCC(ret) && i < sstable_array.count(); ++i) { ObDirectLoadSSTable *sstable = sstable_array.at(i);