From 8cf87984dbf0cf7b6759bb532666276e1aa8d54c Mon Sep 17 00:00:00 2001 From: suz-yang Date: Wed, 28 Feb 2024 03:14:58 +0000 Subject: [PATCH] Direct load memory split tenant. --- .../ob_table_load_client_service.cpp | 1 + .../table_load/ob_table_load_client_task.cpp | 5 ++++- .../table_load/ob_table_load_coordinator.cpp | 3 +++ .../ob_table_load_coordinator_ctx.cpp | 4 +++- .../table_load/ob_table_load_coordinator_ctx.h | 2 +- .../ob_table_load_general_table_compactor.cpp | 11 +++++++---- .../ob_table_load_general_table_compactor.h | 4 ++-- .../table_load/ob_table_load_mem_compactor.cpp | 5 +++-- .../table_load/ob_table_load_merger.cpp | 16 +++++++++++----- ...able_load_multiple_heap_table_compactor.cpp | 7 +++++-- .../ob_table_load_parallel_merge_ctx.cpp | 11 +++++++++-- .../ob_table_load_parallel_merge_ctx.h | 4 ++-- ...ble_load_parallel_merge_table_compactor.cpp | 3 ++- .../ob_table_load_partition_calc.cpp | 7 +++++-- .../ob_table_load_partition_location.cpp | 18 ++++++++++++------ .../ob_table_load_partition_location.h | 2 +- .../table_load/ob_table_load_schema.cpp | 7 +++++-- .../table_load/ob_table_load_service.cpp | 8 ++++++++ .../table_load/ob_table_load_store.cpp | 10 +++++++--- .../table_load/ob_table_load_store_ctx.cpp | 6 +++++- .../table_load/ob_table_load_store_ctx.h | 2 +- .../ob_table_load_table_compactor.cpp | 3 ++- .../table_load/ob_table_load_table_compactor.h | 5 ++++- .../table_load/ob_table_load_table_ctx.cpp | 2 +- src/observer/table_load/ob_table_load_task.cpp | 3 ++- .../ob_table_load_task_scheduler.cpp | 2 +- .../ob_table_load_trans_bucket_writer.cpp | 13 +++++++++---- .../ob_table_load_trans_bucket_writer.h | 2 +- .../table_load/ob_table_load_trans_ctx.cpp | 3 ++- .../table_load/ob_table_load_trans_store.cpp | 10 +++++----- .../table_load/ob_table_load_trans_store.h | 13 ++++++++++--- src/share/table/ob_table_load_dml_stat.h | 8 ++++++-- src/share/table/ob_table_load_sql_statistics.h | 11 ++++++++--- .../engine/cmd/ob_load_data_direct_impl.cpp | 7 ++++++- src/sql/engine/cmd/ob_load_data_direct_impl.h | 7 +++---- .../direct_load/ob_direct_load_data_fuse.cpp | 4 ++-- .../ob_direct_load_external_fragment.cpp | 1 + ...ect_load_external_multi_partition_table.cpp | 2 +- .../ob_direct_load_external_scanner.h | 5 +++-- .../ob_direct_load_fast_heap_table.cpp | 3 ++- .../ob_direct_load_fast_heap_table_builder.cpp | 3 ++- .../ob_direct_load_insert_table_ctx.cpp | 6 +++++- .../direct_load/ob_direct_load_mem_chunk.h | 4 ++-- .../direct_load/ob_direct_load_mem_context.cpp | 4 +++- .../direct_load/ob_direct_load_mem_context.h | 6 +++++- .../direct_load/ob_direct_load_mem_dump.cpp | 8 +++++++- .../direct_load/ob_direct_load_mem_sample.cpp | 6 +++++- .../direct_load/ob_direct_load_merge_ctx.cpp | 11 +++++++++-- .../direct_load/ob_direct_load_merge_ctx.h | 14 +++++++------- .../direct_load/ob_direct_load_multi_map.h | 8 ++++++-- .../ob_direct_load_multiple_heap_table.cpp | 4 +++- ...rect_load_multiple_heap_table_compactor.cpp | 5 ++++- ...ad_multiple_heap_table_index_scan_merge.cpp | 2 +- .../ob_direct_load_multiple_heap_table_map.cpp | 2 +- ..._direct_load_multiple_heap_table_sorter.cpp | 5 ++++- .../ob_direct_load_multiple_sstable.cpp | 5 +++-- ...ob_direct_load_multiple_sstable_builder.cpp | 4 ++-- ..._direct_load_multiple_sstable_compactor.cpp | 5 +++-- ...direct_load_multiple_sstable_scan_merge.cpp | 4 +++- ...b_direct_load_multiple_sstable_scan_merge.h | 2 +- ...ob_direct_load_multiple_sstable_scanner.cpp | 2 +- .../ob_direct_load_origin_table.cpp | 4 +++- .../ob_direct_load_partition_merge_task.cpp | 3 ++- .../ob_direct_load_range_splitter.cpp | 18 +++++++++++++----- .../ob_direct_load_range_splitter.h | 8 ++++---- .../direct_load/ob_direct_load_sstable.cpp | 4 ++-- .../direct_load/ob_direct_load_sstable.h | 1 + .../ob_direct_load_sstable_builder.cpp | 11 +++++------ .../ob_direct_load_sstable_builder.h | 8 +++++++- .../ob_direct_load_sstable_compactor.cpp | 5 +++-- .../ob_direct_load_sstable_scan_merge.cpp | 4 +++- .../ob_direct_load_sstable_scan_merge.h | 2 +- .../ob_direct_load_sstable_scanner.cpp | 10 ++++++---- .../ob_direct_load_table_builder_allocator.h | 4 +--- .../direct_load/ob_direct_load_table_store.cpp | 4 ++-- .../direct_load/ob_direct_load_table_store.h | 6 +++++- .../direct_load/ob_direct_load_tmp_file.cpp | 1 + 77 files changed, 302 insertions(+), 141 deletions(-) diff --git a/src/observer/table_load/ob_table_load_client_service.cpp b/src/observer/table_load/ob_table_load_client_service.cpp index 907c0dfe58..1fcb5e9dee 100644 --- a/src/observer/table_load/ob_table_load_client_service.cpp +++ b/src/observer/table_load/ob_table_load_client_service.cpp @@ -769,6 +769,7 @@ void ObTableLoadClientService::purge_client_task() LOG_WARN("ObTableLoadClientService not init", KR(ret), KP(this)); } else { ObArray client_task_array; + client_task_array.set_tenant_id(MTL_ID()); if (OB_FAIL(get_all_client_task(client_task_array))) { LOG_WARN("fail to get all client task", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_client_task.cpp b/src/observer/table_load/ob_table_load_client_task.cpp index 6cb2d3eb52..77948fde91 100644 --- a/src/observer/table_load/ob_table_load_client_task.cpp +++ b/src/observer/table_load/ob_table_load_client_task.cpp @@ -45,6 +45,8 @@ ObTableLoadClientTask::ObTableLoadClientTask() is_inited_(false) { allocator_.set_tenant_id(MTL_ID()); + column_names_.set_tenant_id(MTL_ID()); + column_idxs_.set_tenant_id(MTL_ID()); free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID; } @@ -156,7 +158,8 @@ int ObTableLoadClientTask::create_session_info(uint64_t user_id, uint64_t databa } else if (OB_FAIL(ObTableLoadUtils::create_session_info(session_info, free_session_ctx))) { LOG_WARN("create session id failed", KR(ret)); } else { - common::ObArenaAllocator allocator; + ObArenaAllocator allocator("TLD_Tmp"); + allocator.set_tenant_id(MTL_ID()); ObStringBuffer buffer(&allocator); buffer.append("DIRECT LOAD_"); buffer.append(table_schema->get_table_name()); diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index 5bc147f787..a496b9c625 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -119,6 +119,7 @@ int ObTableLoadCoordinator::abort_active_trans(ObTableLoadTableCtx *ctx) { int ret = OB_SUCCESS; ObArray trans_id_array; + trans_id_array.set_tenant_id(MTL_ID()); if (OB_FAIL(ctx->coordinator_ctx_->get_active_trans_ids(trans_id_array))) { LOG_WARN("fail to get active trans ids", KR(ret)); } @@ -157,6 +158,8 @@ int ObTableLoadCoordinator::abort_peers_ctx(ObTableLoadTableCtx *ctx) int64_t tries = 0; ObDirectLoadControlAbortArg arg; ObDirectLoadControlAbortRes res; + addr_array1.set_tenant_id(MTL_ID()); + addr_array2.set_tenant_id(MTL_ID()); arg.table_id_ = ctx->param_.table_id_; arg.task_id_ = ctx->ddl_param_.task_id_; for (int64_t i = 0; i < all_addr_array.count(); ++i) { diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp index b63bc952f8..6db9c84cae 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.cpp +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.cpp @@ -46,6 +46,9 @@ ObTableLoadCoordinatorCtx::ObTableLoadCoordinatorCtx(ObTableLoadTableCtx *ctx) enable_heart_beat_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + idx_array_.set_tenant_id(MTL_ID()); + commited_trans_ctx_array_.set_tenant_id(MTL_ID()); } ObTableLoadCoordinatorCtx::~ObTableLoadCoordinatorCtx() @@ -105,7 +108,6 @@ int ObTableLoadCoordinatorCtx::init(const ObIArray &idx_array, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(ctx_->param_), K(idx_array.count()), KPC(exec_ctx)); } else { - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(target_schema_.init(ctx_->param_.tenant_id_, ctx_->ddl_param_.dest_table_id_))) { LOG_WARN("fail to init table load schema", KR(ret), K(ctx_->param_.tenant_id_), K(ctx_->ddl_param_.dest_table_id_)); diff --git a/src/observer/table_load/ob_table_load_coordinator_ctx.h b/src/observer/table_load/ob_table_load_coordinator_ctx.h index 2ff135cd28..d2fc666aa7 100644 --- a/src/observer/table_load/ob_table_load_coordinator_ctx.h +++ b/src/observer/table_load/ob_table_load_coordinator_ctx.h @@ -173,7 +173,7 @@ private: TransMap trans_map_; TransCtxMap trans_ctx_map_; SegmentCtxMap segment_ctx_map_; - common::ObSEArray commited_trans_ctx_array_; + common::ObArray commited_trans_ctx_array_; bool enable_heart_beat_; bool is_inited_; }; diff --git a/src/observer/table_load/ob_table_load_general_table_compactor.cpp b/src/observer/table_load/ob_table_load_general_table_compactor.cpp index 145c079644..e5c4f137fa 100644 --- a/src/observer/table_load/ob_table_load_general_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_general_table_compactor.cpp @@ -159,6 +159,7 @@ void ObTableLoadGeneralTableCompactor::CompactorTask::stop() ObTableLoadGeneralTableCompactor::CompactorTaskIter::CompactorTaskIter() : pos_(0) { + compactor_task_array_.set_tenant_id(MTL_ID()); } ObTableLoadGeneralTableCompactor::CompactorTaskIter::~CompactorTaskIter() @@ -207,11 +208,12 @@ ObTableLoadGeneralTableCompactor::ObTableLoadGeneralTableCompactor() : store_ctx_(nullptr), param_(nullptr), allocator_("TLD_GeneralTC"), - all_compactor_array_(OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator(allocator_)), running_thread_count_(0), has_error_(false), is_stop_(false) { + allocator_.set_tenant_id(MTL_ID()); + all_compactor_array_.set_tenant_id(MTL_ID()); } ObTableLoadGeneralTableCompactor::~ObTableLoadGeneralTableCompactor() @@ -241,7 +243,6 @@ int ObTableLoadGeneralTableCompactor::inner_init() int ret = OB_SUCCESS; store_ctx_ = compact_ctx_->store_ctx_; param_ = &store_ctx_->ctx_->param_; - allocator_.set_tenant_id(MTL_ID()); return ret; } @@ -277,9 +278,11 @@ void ObTableLoadGeneralTableCompactor::stop() int ObTableLoadGeneralTableCompactor::construct_compactors() { int ret = OB_SUCCESS; - ObArenaAllocator allocator; + ObArenaAllocator allocator("TLD_Tmp"); CompactorTaskMap *compactor_task_map_array = nullptr; - ObSEArray trans_store_array; + ObArray trans_store_array; + allocator.set_tenant_id(MTL_ID()); + trans_store_array.set_block_allocator(ModulePageAllocator(allocator)); if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { LOG_WARN("fail to get committed trans stores", KR(ret)); } else if (OB_ISNULL(compactor_task_map_array = static_cast( diff --git a/src/observer/table_load/ob_table_load_general_table_compactor.h b/src/observer/table_load/ob_table_load_general_table_compactor.h index 4874805e5b..f429bd2d87 100644 --- a/src/observer/table_load/ob_table_load_general_table_compactor.h +++ b/src/observer/table_load/ob_table_load_general_table_compactor.h @@ -57,7 +57,7 @@ private: int add(CompactorTask *compactor_task); int get_next_compactor_task(CompactorTask *&compactor_task); private: - common::ObSEArray compactor_task_array_; + common::ObArray compactor_task_array_; int64_t pos_; }; int add_tablet_table(int32_t session_id, CompactorTaskMap &compactor_task_map, @@ -77,7 +77,7 @@ private: ObTableLoadStoreCtx *store_ctx_; const ObTableLoadParam *param_; common::ObArenaAllocator allocator_; - common::ObSEArray all_compactor_array_; + common::ObArray all_compactor_array_; mutable lib::ObMutex mutex_; CompactorTaskIter compactor_task_iter_; common::ObDList compacting_list_; diff --git a/src/observer/table_load/ob_table_load_mem_compactor.cpp b/src/observer/table_load/ob_table_load_mem_compactor.cpp index 80d2965f8e..c740a5c3a3 100644 --- a/src/observer/table_load/ob_table_load_mem_compactor.cpp +++ b/src/observer/table_load/ob_table_load_mem_compactor.cpp @@ -238,6 +238,7 @@ ObTableLoadMemCompactor::ObTableLoadMemCompactor() task_scheduler_(nullptr), parallel_merge_cb_(this) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadMemCompactor::~ObTableLoadMemCompactor() @@ -268,7 +269,6 @@ int ObTableLoadMemCompactor::inner_init() const uint64_t tenant_id = MTL_ID(); store_ctx_ = compact_ctx_->store_ctx_; param_ = &(store_ctx_->ctx_->param_); - allocator_.set_tenant_id(tenant_id); if (OB_FAIL(init_scheduler())) { LOG_WARN("fail to init_scheduler", KR(ret)); } else { @@ -331,7 +331,8 @@ int ObTableLoadMemCompactor::start() int ObTableLoadMemCompactor::construct_compactors() { int ret = OB_SUCCESS; - ObSEArray trans_store_array; + ObArray trans_store_array; + trans_store_array.set_tenant_id(MTL_ID()); if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { LOG_WARN("fail to get committed trans stores", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_merger.cpp b/src/observer/table_load/ob_table_load_merger.cpp index 8d9e912241..6e9062fcb2 100644 --- a/src/observer/table_load/ob_table_load_merger.cpp +++ b/src/observer/table_load/ob_table_load_merger.cpp @@ -314,6 +314,7 @@ int ObTableLoadMerger::build_merge_ctx() // for optimize split range is too slow ObArray multiple_sstable_array; ObDirectLoadMultipleMergeRangeSplitter range_splitter; + multiple_sstable_array.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && i < table_array->count(); ++i) { ObDirectLoadMultipleSSTable *sstable = nullptr; if (OB_ISNULL(sstable = dynamic_cast(table_array->at(i)))) { @@ -401,6 +402,7 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) if (store_ctx_->is_fast_heap_table_) { ObDirectLoadMultiMap tables; ObArray trans_store_array; + trans_store_array.set_tenant_id(MTL_ID()); if (OB_FAIL(tables.init())) { LOG_WARN("fail to init table", KR(ret)); } else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { @@ -427,7 +429,8 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) } for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; + ObArray heap_table_array; + heap_table_array.set_tenant_id(MTL_ID()); if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) { LOG_WARN("get heap sstable failed", KR(ret)); } else if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) { @@ -438,8 +441,8 @@ int ObTableLoadMerger::collect_dml_stat(ObTableLoadDmlStat &dml_stats) } else { for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; - if (OB_FAIL(tablet_ctx->collect_dml_stat(heap_table_array, dml_stats))) { + ObArray empty_heap_table_array; + if (OB_FAIL(tablet_ctx->collect_dml_stat(empty_heap_table_array, dml_stats))) { LOG_WARN("fail to collect sql statics", KR(ret)); } } @@ -453,6 +456,7 @@ int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_stat if (store_ctx_->is_fast_heap_table_) { ObDirectLoadMultiMap tables; ObArray trans_store_array; + trans_store_array.set_tenant_id(MTL_ID()); if (OB_FAIL(tables.init())) { LOG_WARN("fail to init table", KR(ret)); } else if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { @@ -479,7 +483,8 @@ int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_stat } for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; + ObArray heap_table_array; + heap_table_array.set_tenant_id(MTL_ID()); if (OB_FAIL(tables.get(tablet_ctx->get_tablet_id(), heap_table_array))) { LOG_WARN("get heap sstable failed", KR(ret)); } else if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { @@ -490,7 +495,8 @@ int ObTableLoadMerger::collect_sql_statistics(ObTableLoadSqlStatistics &sql_stat } else { for (int i = 0; OB_SUCC(ret) && i < merge_ctx_.get_tablet_merge_ctxs().count(); ++i) { ObDirectLoadTabletMergeCtx *tablet_ctx = merge_ctx_.get_tablet_merge_ctxs().at(i); - ObArray heap_table_array ; + ObArray heap_table_array; + heap_table_array.set_tenant_id(MTL_ID()); if (OB_FAIL(tablet_ctx->collect_sql_statistics(heap_table_array, sql_statistics))) { LOG_WARN("fail to collect sql statics", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp index 697e16df3a..2daa6527c5 100644 --- a/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_multiple_heap_table_compactor.cpp @@ -85,6 +85,7 @@ public: { ctx_->inc_ref_count(); heap_table_allocator_.set_tenant_id(MTL_ID()); + heap_table_array_.set_tenant_id(MTL_ID()); } virtual ~CompactTaskProcessor() { @@ -142,6 +143,7 @@ private: ObArray *next_round = &tmp_heap_table_array; ObDirectLoadMultipleHeapTableCompactParam heap_table_compact_param; ObDirectLoadMultipleHeapTableCompactor heap_table_compactor; + tmp_heap_table_array.set_tenant_id(MTL_ID()); heap_table_compact_param.table_data_desc_ = mem_ctx_->table_data_desc_; heap_table_compact_param.file_mgr_ = mem_ctx_->file_mgr_; heap_table_compact_param.index_dir_id_ = index_dir_id_; @@ -294,6 +296,7 @@ ObTableLoadMultipleHeapTableCompactor::ObTableLoadMultipleHeapTableCompactor() allocator_("TLD_MemC"), finish_task_count_(0) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadMultipleHeapTableCompactor::~ObTableLoadMultipleHeapTableCompactor() @@ -316,7 +319,6 @@ int ObTableLoadMultipleHeapTableCompactor::inner_init() const uint64_t tenant_id = MTL_ID(); store_ctx_ = compact_ctx_->store_ctx_; param_ = &(store_ctx_->ctx_->param_); - allocator_.set_tenant_id(tenant_id); mem_ctx_.mem_dump_task_count_ = param_->session_count_ / 3; //暂时先写成1/3,后续再优化 if (mem_ctx_.mem_dump_task_count_ == 0) { @@ -360,7 +362,8 @@ int ObTableLoadMultipleHeapTableCompactor::start() int ObTableLoadMultipleHeapTableCompactor::construct_compactors() { int ret = OB_SUCCESS; - ObSEArray trans_store_array; + ObArray trans_store_array; + trans_store_array.set_tenant_id(MTL_ID()); if (OB_FAIL(store_ctx_->get_committed_trans_stores(trans_store_array))) { LOG_WARN("fail to get committed trans stores", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp index 887ab2236b..d031d32c95 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.cpp @@ -77,6 +77,8 @@ ObTableLoadParallelMergeTabletCtx::ObTableLoadParallelMergeTabletCtx() { allocator_.set_tenant_id(MTL_ID()); range_allocator_.set_tenant_id(MTL_ID()); + ranges_.set_tenant_id(MTL_ID()); + range_sstables_.set_tenant_id(MTL_ID()); } ObTableLoadParallelMergeTabletCtx::~ObTableLoadParallelMergeTabletCtx() @@ -229,7 +231,8 @@ public: } else { const int64_t merge_sstable_count = MIN(tablet_ctx_->sstables_.size() - merge_count_per_round + 1, merge_count_per_round); - ObSEArray sstable_array; + ObArray sstable_array; + sstable_array.set_tenant_id(MTL_ID()); // sort sstable ObTableLoadParallelMergeSSTableCompare compare; std::sort(tablet_ctx_->sstables_.begin(), tablet_ctx_->sstables_.end(), compare); @@ -293,6 +296,7 @@ public: extra_buf_size_(0) { ctx_->inc_ref_count(); + sstable_array_.set_tenant_id(MTL_ID()); } virtual ~MergeRangeTaskProcessor() { @@ -406,7 +410,7 @@ private: ObTableLoadParallelMergeCtx *parallel_merge_ctx_; ObTableLoadParallelMergeTabletCtx *tablet_ctx_; int64_t range_idx_; - ObSEArray sstable_array_; + ObArray sstable_array_; ObDirectLoadMultipleSSTableScanMerge scan_merge_; char *extra_buf_; int64_t extra_buf_size_; @@ -539,6 +543,9 @@ ObTableLoadParallelMergeCtx::ObTableLoadParallelMergeCtx() is_inited_(false) { allocator_.set_tenant_id(MTL_ID()); + light_task_list_.set_tenant_id(MTL_ID()); + heavy_task_list_.set_tenant_id(MTL_ID()); + idle_thread_list_.set_tenant_id(MTL_ID()); } ObTableLoadParallelMergeCtx::~ObTableLoadParallelMergeCtx() diff --git a/src/observer/table_load/ob_table_load_parallel_merge_ctx.h b/src/observer/table_load/ob_table_load_parallel_merge_ctx.h index b6a3e711a0..a75bf758b8 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_ctx.h +++ b/src/observer/table_load/ob_table_load_parallel_merge_ctx.h @@ -58,8 +58,8 @@ public: int64_t range_sstable_count_; lib::ObMutex mutex_; // for alloc range sstable common::ObArenaAllocator range_allocator_; // for alloc range and range sstable - common::ObSEArray ranges_; - common::ObSEArray range_sstables_; + common::ObArray ranges_; + common::ObArray range_sstables_; }; class ObTableLoadParallelMergeCb diff --git a/src/observer/table_load/ob_table_load_parallel_merge_table_compactor.cpp b/src/observer/table_load/ob_table_load_parallel_merge_table_compactor.cpp index 19ab79a4ce..52b203a2cf 100644 --- a/src/observer/table_load/ob_table_load_parallel_merge_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_parallel_merge_table_compactor.cpp @@ -92,7 +92,8 @@ int ObTableLoadParallelMergeTableCompactor::start() ret = OB_NOT_INIT; LOG_WARN("ObTableLoadParallelMergeTableCompactor not init", KR(ret), KP(this)); } else { - ObSEArray trans_store_array; + ObArray trans_store_array; + trans_store_array.set_tenant_id(MTL_ID()); if (OB_FAIL(compact_ctx_->store_ctx_->get_committed_trans_stores(trans_store_array))) { LOG_WARN("fail to get committed trans stores", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_partition_calc.cpp b/src/observer/table_load/ob_table_load_partition_calc.cpp index d8033e4c68..9409a7417a 100644 --- a/src/observer/table_load/ob_table_load_partition_calc.cpp +++ b/src/observer/table_load/ob_table_load_partition_calc.cpp @@ -40,6 +40,7 @@ ObTableLoadPartitionCalc::ObTableLoadPartitionCalc() exec_ctx_(allocator_), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, sql::ObSQLSessionInfo *session_info) @@ -51,7 +52,6 @@ int ObTableLoadPartitionCalc::init(const ObTableLoadParam ¶m, sql::ObSQLSess } else { uint64_t tenant_id = param.tenant_id_; uint64_t table_id = param.table_id_; - allocator_.set_tenant_id(tenant_id); sql_ctx_.schema_guard_ = &schema_guard_; exec_ctx_.set_sql_ctx(&sql_ctx_); const ObTableSchema *table_schema = nullptr; @@ -94,7 +94,8 @@ int ObTableLoadPartitionCalc::init_part_key_index(const ObTableSchema *table_sch ObIAllocator &allocator) { int ret = OB_SUCCESS; - ObSEArray column_descs; + ObArray column_descs; + column_descs.set_tenant_id(MTL_ID()); if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) { LOG_WARN("fail to get column ids", KR(ret)); } else if (OB_UNLIKELY(column_descs.empty())) { @@ -193,6 +194,8 @@ int ObTableLoadPartitionCalc::get_partition_by_row( int ret = OB_SUCCESS; ObArray tablet_ids; ObArray part_ids; + tablet_ids.set_tenant_id(MTL_ID()); + part_ids.set_tenant_id(MTL_ID()); if (OB_FAIL(table_location_.calculate_partition_ids_by_rows2( *session_info_, schema_guard_, param_->table_id_, part_rows, tablet_ids, part_ids))) { LOG_WARN("fail to calc partition id", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_partition_location.cpp b/src/observer/table_load/ob_table_load_partition_location.cpp index ffe5f09713..c6c935b079 100644 --- a/src/observer/table_load/ob_table_load_partition_location.cpp +++ b/src/observer/table_load/ob_table_load_partition_location.cpp @@ -91,6 +91,7 @@ int ObTableLoadPartitionLocation::fetch_ls_locations(uint64_t tenant_id, { int ret = OB_SUCCESS; ObArray ls_ids; + ls_ids.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && (i < partition_ids.count()); ++i) { const ObTabletID &tablet_id = partition_ids[i].tablet_id_; @@ -267,18 +268,22 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) if (OB_UNLIKELY(OB_HASH_NOT_EXIST != ret)) { LOG_WARN("fail to get refactored", KR(ret), K(addr)); } else { - if (OB_ISNULL(partition_id_array = + ObArray *new_array = nullptr; + if (OB_ISNULL(new_array = OB_NEWx(ObArray, (&tmp_allocator)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to new array", KR(ret)); - } else if (OB_FAIL(addr_map.set_refactored(addr, partition_id_array))) { + } else if (OB_FAIL(addr_map.set_refactored(addr, new_array))) { LOG_WARN("fail to set refactored", KR(ret), K(addr)); + } else { + new_array->set_tenant_id(MTL_ID()); + partition_id_array = new_array; } if (OB_FAIL(ret)) { - if (nullptr != partition_id_array) { - partition_id_array->~ObIArray(); - tmp_allocator.free(partition_id_array); - partition_id_array = nullptr; + if (nullptr != new_array) { + new_array->~ObArray(); + tmp_allocator.free(new_array); + new_array = nullptr; } } } @@ -300,6 +305,7 @@ int ObTableLoadPartitionLocation::init_all_leader_info(ObIAllocator &allocator) } } ObArray sort_array; + sort_array.set_tenant_id(MTL_ID()); for (addr_iter = addr_map.begin(); OB_SUCC(ret) && addr_iter != addr_map.end(); ++pos, ++addr_iter) { LeaderInfoForSort item; item.addr_ = addr_iter->first; diff --git a/src/observer/table_load/ob_table_load_partition_location.h b/src/observer/table_load/ob_table_load_partition_location.h index f49c400660..a5e0679a65 100644 --- a/src/observer/table_load/ob_table_load_partition_location.h +++ b/src/observer/table_load/ob_table_load_partition_location.h @@ -54,7 +54,7 @@ public: TO_STRING_KV(K_(addr), KP_(partition_id_array_ptr)); }; public: - ObTableLoadPartitionLocation() : is_inited_(false) {} + ObTableLoadPartitionLocation() : is_inited_(false) { tablet_ids_.set_tenant_id(MTL_ID()); } int init(uint64_t tenant_id, const table::ObTableLoadArray &partition_ids, common::ObIAllocator &allocator); diff --git a/src/observer/table_load/ob_table_load_schema.cpp b/src/observer/table_load/ob_table_load_schema.cpp index fadcfd9604..e305eea688 100644 --- a/src/observer/table_load/ob_table_load_schema.cpp +++ b/src/observer/table_load/ob_table_load_schema.cpp @@ -152,7 +152,8 @@ int ObTableLoadSchema::get_column_idxs(const ObTableSchema *table_schema, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KP(table_schema)); } else { - ObSEArray column_descs; + ObArray column_descs; + column_descs.set_tenant_id(MTL_ID()); if (OB_FAIL(table_schema->get_column_ids(column_descs, false))) { LOG_WARN("fail to get column ids", KR(ret)); } @@ -205,6 +206,7 @@ ObTableLoadSchema::ObTableLoadSchema() schema_version_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); column_descs_.set_block_allocator(ModulePageAllocator(allocator_)); multi_version_column_descs_.set_block_allocator(ModulePageAllocator(allocator_)); } @@ -243,7 +245,6 @@ int ObTableLoadSchema::init(uint64_t tenant_id, uint64_t table_id) ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadSchema init twice", KR(ret)); } else { - allocator_.set_tenant_id(tenant_id); ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = nullptr; if (OB_FAIL(get_table_schema(tenant_id, table_id, schema_guard, table_schema))) { @@ -295,6 +296,8 @@ int ObTableLoadSchema::init_table_schema(const ObTableSchema *table_schema) if (OB_SUCC(ret)) { ObArray tablet_ids; ObArray part_ids; + tablet_ids.set_tenant_id(MTL_ID()); + part_ids.set_tenant_id(MTL_ID()); if (OB_FAIL(table_schema->get_all_tablet_and_object_ids(tablet_ids, part_ids))) { LOG_WARN("fail to get all tablet ids", KR(ret)); } else if (OB_FAIL(partition_ids_.create(part_ids.count(), allocator_))) { diff --git a/src/observer/table_load/ob_table_load_service.cpp b/src/observer/table_load/ob_table_load_service.cpp index fd732e0132..ad47cd7b3a 100644 --- a/src/observer/table_load/ob_table_load_service.cpp +++ b/src/observer/table_load/ob_table_load_service.cpp @@ -97,6 +97,7 @@ void ObTableLoadService::ObHeartBeatTask::runTimerTask() LOG_DEBUG("table load heart beat", K(tenant_id_)); ObTableLoadManager &manager = service_.get_manager(); ObArray table_ctx_array; + table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(manager.get_all_table_ctx(table_ctx_array))) { LOG_WARN("fail to get all table ctx", KR(ret), K(tenant_id_)); } @@ -142,6 +143,7 @@ void ObTableLoadService::ObGCTask::runTimerTask() LOG_DEBUG("table load start gc", K(tenant_id_)); ObTableLoadManager &manager = service_.get_manager(); ObArray table_ctx_array; + table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(manager.get_all_table_ctx(table_ctx_array))) { LOG_WARN("fail to get all table ctx", KR(ret), K(tenant_id_)); } @@ -263,6 +265,7 @@ void ObTableLoadService::ObReleaseTask::runTimerTask() } else { LOG_DEBUG("table load start release", K(tenant_id_)); ObArray releasable_table_ctx_array; + releasable_table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(service_.manager_.get_releasable_table_ctx_list(releasable_table_ctx_array))) { LOG_WARN("fail to get releasable table ctx list", KR(ret), K(tenant_id_)); } @@ -304,6 +307,7 @@ void ObTableLoadService::ObClientTaskAutoAbortTask::runTimerTask() } else { LOG_DEBUG("table load auto abort client task", K(tenant_id_)); ObArray client_task_array; + client_task_array.set_tenant_id(MTL_ID()); if (OB_FAIL(service_.get_client_service().get_all_client_task(client_task_array))) { LOG_WARN("fail to get all client task", KR(ret)); } else { @@ -631,6 +635,7 @@ void ObTableLoadService::abort_all_client_task() { int ret = OB_SUCCESS; ObArray client_task_array; + client_task_array.set_tenant_id(MTL_ID()); if (OB_FAIL(client_service_.get_all_client_task(client_task_array))) { LOG_WARN("fail to get all client task", KR(ret)); } else { @@ -648,6 +653,7 @@ void ObTableLoadService::fail_all_ctx(int error_code) { int ret = OB_SUCCESS; ObArray table_ctx_array; + table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(manager_.get_all_table_ctx(table_ctx_array))) { LOG_WARN("fail to get all table ctx list", KR(ret)); } else { @@ -679,6 +685,7 @@ void ObTableLoadService::release_all_ctx() abort_all_client_task(); fail_all_ctx(OB_ERR_UNEXPECTED_UNIT_STATUS); ObArray table_ctx_array; + table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(manager_.get_inactive_table_ctx_list(table_ctx_array))) { LOG_WARN("fail to get inactive table ctx list", KR(ret), K(tenant_id)); } else { @@ -715,6 +722,7 @@ void ObTableLoadService::release_all_ctx() LOG_INFO("[DIRECT LOAD DIRTY LIST]", "count", manager_.get_dirty_list_count()); } ObArray table_ctx_array; + table_ctx_array.set_tenant_id(MTL_ID()); if (OB_FAIL(manager_.get_releasable_table_ctx_list(table_ctx_array))) { LOG_WARN("fail to get releasable table ctx list", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_store.cpp b/src/observer/table_load/ob_table_load_store.cpp index 6a6c49be96..c8f80d506d 100644 --- a/src/observer/table_load/ob_table_load_store.cpp +++ b/src/observer/table_load/ob_table_load_store.cpp @@ -88,6 +88,7 @@ int ObTableLoadStore::abort_active_trans(ObTableLoadTableCtx *ctx) { int ret = OB_SUCCESS; ObArray trans_id_array; + trans_id_array.set_tenant_id(MTL_ID()); if (OB_FAIL(ctx->store_ctx_->get_active_trans_ids(trans_id_array))) { LOG_WARN("fail to get active trans ids", KR(ret)); } @@ -221,9 +222,10 @@ int ObTableLoadStore::pre_merge( LOG_WARN("ObTableLoadStore not init", KR(ret), KP(this)); } else { LOG_INFO("store pre merge"); - ObArenaAllocator allocator; + ObArenaAllocator allocator("TLD_Tmp"); bool trans_exist = false; ObTableLoadArray store_committed_trans_id_array; + allocator.set_tenant_id(MTL_ID()); // 1. 冻结状态, 防止后续继续创建trans if (OB_FAIL(store_ctx_->set_status_frozen())) { LOG_WARN("fail to set store status frozen", KR(ret)); @@ -339,8 +341,10 @@ int ObTableLoadStore::commit_sql_statistics(const ObTableLoadSqlStatistics &sql_ const uint64_t table_id = param_.table_id_; ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = nullptr; - ObSEArray part_column_stats; - ObSEArray part_table_stats; + ObArray part_column_stats; + ObArray part_table_stats; + part_column_stats.set_tenant_id(MTL_ID()); + part_table_stats.set_tenant_id(MTL_ID()); if (sql_statistics.is_empty()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("sql statistics is empty", K(ret)); diff --git a/src/observer/table_load/ob_table_load_store_ctx.cpp b/src/observer/table_load/ob_table_load_store_ctx.cpp index 427b932e5b..52666a218e 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.cpp +++ b/src/observer/table_load/ob_table_load_store_ctx.cpp @@ -44,7 +44,7 @@ using namespace share; ObTableLoadStoreCtx::ObTableLoadStoreCtx(ObTableLoadTableCtx *ctx) : ctx_(ctx), - allocator_("TLD_StoreCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, ctx->param_.tenant_id_), + allocator_("TLD_StoreCtx"), task_scheduler_(nullptr), merger_(nullptr), insert_table_ctx_(nullptr), @@ -60,6 +60,10 @@ ObTableLoadStoreCtx::ObTableLoadStoreCtx(ObTableLoadTableCtx *ctx) enable_heart_beat_check_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + ls_partition_ids_.set_tenant_id(MTL_ID()); + target_ls_partition_ids_.set_tenant_id(MTL_ID()); + committed_trans_store_array_.set_tenant_id(MTL_ID()); } ObTableLoadStoreCtx::~ObTableLoadStoreCtx() diff --git a/src/observer/table_load/ob_table_load_store_ctx.h b/src/observer/table_load/ob_table_load_store_ctx.h index 1df46bda50..cd7723aecf 100644 --- a/src/observer/table_load/ob_table_load_store_ctx.h +++ b/src/observer/table_load/ob_table_load_store_ctx.h @@ -187,7 +187,7 @@ private: TransMap trans_map_; TransCtxMap trans_ctx_map_; SegmentCtxMap segment_ctx_map_; - common::ObSEArray committed_trans_store_array_; + common::ObArray committed_trans_store_array_; uint64_t last_heart_beat_ts_; bool enable_heart_beat_check_; bool is_inited_; diff --git a/src/observer/table_load/ob_table_load_table_compactor.cpp b/src/observer/table_load/ob_table_load_table_compactor.cpp index ffcaf501ea..ce765fade0 100644 --- a/src/observer/table_load/ob_table_load_table_compactor.cpp +++ b/src/observer/table_load/ob_table_load_table_compactor.cpp @@ -36,6 +36,8 @@ using namespace storage; ObTableLoadTableCompactResult::ObTableLoadTableCompactResult() : allocator_("TLD_TCResult"), tablet_result_map_(64) { + allocator_.set_tenant_id(MTL_ID()); + all_table_array_.set_tenant_id(MTL_ID()); } ObTableLoadTableCompactResult::~ObTableLoadTableCompactResult() @@ -58,7 +60,6 @@ void ObTableLoadTableCompactResult::reset() int ObTableLoadTableCompactResult::init() { int ret = OB_SUCCESS; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(tablet_result_map_.init("TLD_TCResult", MTL_ID()))) { LOG_WARN("fail to init link hash map", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_table_compactor.h b/src/observer/table_load/ob_table_load_table_compactor.h index bc3ac4bb47..15f3ccabc4 100644 --- a/src/observer/table_load/ob_table_load_table_compactor.h +++ b/src/observer/table_load/ob_table_load_table_compactor.h @@ -27,8 +27,11 @@ class ObTableLoadTableCompactor; struct ObTableLoadTableCompactTabletResult : public common::LinkHashValue { - common::ObSEArray table_array_; +public: + ObTableLoadTableCompactTabletResult() { table_array_.set_tenant_id(MTL_ID()); } TO_STRING_KV(K_(table_array)); +public: + common::ObArray table_array_; }; struct ObTableLoadTableCompactResult diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 56882e62ec..fd86915d62 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -42,6 +42,7 @@ ObTableLoadTableCtx::ObTableLoadTableCtx() is_inited_(false) { free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID; + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadTableCtx::~ObTableLoadTableCtx() @@ -62,7 +63,6 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD } else { param_ = param; ddl_param_ = ddl_param; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(schema_.init(param_.tenant_id_, param_.table_id_))) { LOG_WARN("fail to init table load schema", KR(ret), K(param_.tenant_id_), K(param_.table_id_)); diff --git a/src/observer/table_load/ob_table_load_task.cpp b/src/observer/table_load/ob_table_load_task.cpp index 629b8f9aa2..bf8f4ba9f6 100644 --- a/src/observer/table_load/ob_table_load_task.cpp +++ b/src/observer/table_load/ob_table_load_task.cpp @@ -22,10 +22,11 @@ using namespace common; ObTableLoadTask::ObTableLoadTask(uint64_t tenant_id) : trace_id_(*ObCurTraceId::get_trace_id()), - allocator_("TLD_Task", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), + allocator_("TLD_Task"), processor_(nullptr), callback_(nullptr) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadTask::~ObTableLoadTask() diff --git a/src/observer/table_load/ob_table_load_task_scheduler.cpp b/src/observer/table_load/ob_table_load_task_scheduler.cpp index 2e5e77bab0..9d6a94d176 100644 --- a/src/observer/table_load/ob_table_load_task_scheduler.cpp +++ b/src/observer/table_load/ob_table_load_task_scheduler.cpp @@ -68,6 +68,7 @@ ObTableLoadTaskThreadPoolScheduler::ObTableLoadTaskThreadPoolScheduler(int64_t t state_(STATE_ZERO), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); snprintf(name_, OB_THREAD_NAME_BUF_LEN, "TLD_%03ld_%s", table_id % 1000, label); } @@ -112,7 +113,6 @@ int ObTableLoadTaskThreadPoolScheduler::init() ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadTaskThreadPoolScheduler init twice", KR(ret), KP(this)); } else { - allocator_.set_tenant_id(MTL_ID()); thread_pool_.set_thread_count(thread_count_); thread_pool_.set_run_wrapper(MTL_CTX()); ObCurTraceId::TraceId *cur_trace_id = ObCurTraceId::get_trace_id(); diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp index 32ae341064..5417594299 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.cpp @@ -40,6 +40,8 @@ using namespace table; ObTableLoadTransBucketWriter::SessionContext::SessionContext() : session_id_(0), allocator_("TLD_TB_SessCtx"), last_receive_sequence_no_(0) { + allocator_.set_tenant_id(MTL_ID()); + load_bucket_array_.set_tenant_id(MTL_ID()); } ObTableLoadTransBucketWriter::SessionContext::~SessionContext() @@ -62,13 +64,14 @@ ObTableLoadTransBucketWriter::ObTableLoadTransBucketWriter(ObTableLoadTransCtx * : trans_ctx_(trans_ctx), coordinator_ctx_(trans_ctx_->ctx_->coordinator_ctx_), param_(trans_ctx_->ctx_->param_), - allocator_("TLD_TBWriter", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_), + allocator_("TLD_TBWriter"), is_partitioned_(false), session_ctx_array_(nullptr), ref_count_(0), is_flush_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObTableLoadTransBucketWriter::~ObTableLoadTransBucketWriter() @@ -115,7 +118,6 @@ int ObTableLoadTransBucketWriter::init_session_ctx_array() for (int64_t i = 0; OB_SUCC(ret) && i < param_.session_count_; ++i) { SessionContext *session_ctx = session_ctx_array_ + i; session_ctx->session_id_ = i + 1; - session_ctx->allocator_.set_tenant_id(param_.tenant_id_); if (!is_partitioned_) { ObTableLoadPartitionLocation::PartitionLocationInfo info; if (OB_UNLIKELY(1 != coordinator_ctx_->ctx_->schema_.partition_ids_.count())) { @@ -212,7 +214,7 @@ int ObTableLoadTransBucketWriter::handle_partition_with_autoinc_identity( { int ret = OB_SUCCESS; const int64_t row_count = obj_rows.count(); - ObArenaAllocator autoinc_allocator("TLD_Autoinc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_); + ObArenaAllocator autoinc_allocator("TLD_Autoinc", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); ObDataTypeCastParams cast_params(coordinator_ctx_->partition_calc_.session_info_->get_timezone_info()); ObCastCtx cast_ctx(&autoinc_allocator, &cast_params, CM_NONE, ObCharset::get_system_collation()); @@ -318,14 +320,17 @@ int ObTableLoadTransBucketWriter::write_for_partitioned(SessionContext &session_ const ObTableLoadObjRowArray &obj_rows) { int ret = OB_SUCCESS; - ObArenaAllocator allocator("TLD_Misc", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_); + ObArenaAllocator allocator("TLD_Misc"); const int64_t part_key_obj_count = coordinator_ctx_->partition_calc_.get_part_key_obj_count(); ObArray partition_ids; ObArray part_keys; ObArray row_idxs; ObTableLoadErrorRowHandler *error_row_handler = coordinator_ctx_->error_row_handler_; + allocator.set_tenant_id(MTL_ID()); partition_ids.set_block_allocator(common::ModulePageAllocator(allocator)); + part_keys.set_block_allocator(common::ModulePageAllocator(allocator)); + row_idxs.set_block_allocator(common::ModulePageAllocator(allocator)); for (int64_t i = 0; OB_SUCC(ret) && i < obj_rows.count(); ++i) { ObNewRow part_key; part_key.count_ = part_key_obj_count; diff --git a/src/observer/table_load/ob_table_load_trans_bucket_writer.h b/src/observer/table_load/ob_table_load_trans_bucket_writer.h index e564cd760a..f8731e1c47 100644 --- a/src/observer/table_load/ob_table_load_trans_bucket_writer.h +++ b/src/observer/table_load/ob_table_load_trans_bucket_writer.h @@ -83,7 +83,7 @@ private: ObTableLoadBucket load_bucket_; // for partitioned table common::hash::ObHashMap load_bucket_map_; - common::ObSEArray load_bucket_array_; + common::ObArray load_bucket_array_; // 以下参数加锁访问 lib::ObMutex mutex_; uint64_t last_receive_sequence_no_; diff --git a/src/observer/table_load/ob_table_load_trans_ctx.cpp b/src/observer/table_load/ob_table_load_trans_ctx.cpp index bd4dac6474..3f4ad07a5e 100644 --- a/src/observer/table_load/ob_table_load_trans_ctx.cpp +++ b/src/observer/table_load/ob_table_load_trans_ctx.cpp @@ -27,10 +27,11 @@ ObTableLoadTransCtx::ObTableLoadTransCtx(ObTableLoadTableCtx *ctx, const ObTableLoadTransId &trans_id) : ctx_(ctx), trans_id_(trans_id), - allocator_("TLD_TCtx", OB_MALLOC_NORMAL_BLOCK_SIZE, ctx->param_.tenant_id_), + allocator_("TLD_TCtx"), trans_status_(ObTableLoadTransStatusType::NONE), error_code_(OB_SUCCESS) { + allocator_.set_tenant_id(MTL_ID()); } int ObTableLoadTransCtx::advance_trans_status(ObTableLoadTransStatusType trans_status) diff --git a/src/observer/table_load/ob_table_load_trans_store.cpp b/src/observer/table_load/ob_table_load_trans_store.cpp index 8b4e664dfb..bb291986ce 100644 --- a/src/observer/table_load/ob_table_load_trans_store.cpp +++ b/src/observer/table_load/ob_table_load_trans_store.cpp @@ -59,9 +59,6 @@ int ObTableLoadTransStore::init() } else { session_store->session_id_ = i + 1; } - session_store->allocator_.set_tenant_id(trans_ctx_->ctx_->param_.tenant_id_); - session_store->partition_table_array_.set_block_allocator( - ModulePageAllocator(session_store->allocator_)); if (OB_FAIL(session_store_array_.push_back(session_store))) { LOG_WARN("fail to push back session store", KR(ret)); } @@ -101,12 +98,13 @@ void ObTableLoadTransStore::reset() ObTableLoadTransStoreWriter::SessionContext::SessionContext(int32_t session_id, uint64_t tenant_id, ObDataTypeCastParams cast_params) : session_id_(session_id), - cast_allocator_("TLD_TS_Caster", OB_MALLOC_NORMAL_BLOCK_SIZE, tenant_id), + cast_allocator_("TLD_TS_Caster"), cast_params_(cast_params), last_receive_sequence_no_(0), extra_buf_(nullptr), extra_buf_size_(0) { + cast_allocator_.set_tenant_id(MTL_ID()); } ObTableLoadTransStoreWriter::SessionContext::~SessionContext() @@ -119,11 +117,13 @@ ObTableLoadTransStoreWriter::ObTableLoadTransStoreWriter(ObTableLoadTransStore * trans_ctx_(trans_store->trans_ctx_), store_ctx_(trans_ctx_->ctx_->store_ctx_), param_(trans_ctx_->ctx_->param_), - allocator_("TLD_TSWriter", OB_MALLOC_NORMAL_BLOCK_SIZE, param_.tenant_id_), + allocator_("TLD_TSWriter"), table_data_desc_(nullptr), ref_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + column_schemas_.set_tenant_id(MTL_ID()); } ObTableLoadTransStoreWriter::~ObTableLoadTransStoreWriter() diff --git a/src/observer/table_load/ob_table_load_trans_store.h b/src/observer/table_load/ob_table_load_trans_store.h index a145d0c60b..c2f4d06263 100644 --- a/src/observer/table_load/ob_table_load_trans_store.h +++ b/src/observer/table_load/ob_table_load_trans_store.h @@ -39,21 +39,28 @@ class ObTableLoadStoreCtx; class ObTableLoadTransStore { public: - ObTableLoadTransStore(ObTableLoadTransCtx *trans_ctx) : trans_ctx_(trans_ctx) {} + ObTableLoadTransStore(ObTableLoadTransCtx *trans_ctx) : trans_ctx_(trans_ctx) + { + session_store_array_.set_tenant_id(MTL_ID()); + } ~ObTableLoadTransStore() { reset(); } int init(); void reset(); TO_STRING_KV(KP_(trans_ctx), K_(session_store_array)); struct SessionStore { - SessionStore() : session_id_(0), allocator_("TLD_SessStore") {} + SessionStore() : session_id_(0), allocator_("TLD_SessStore") + { + allocator_.set_tenant_id(MTL_ID()); + partition_table_array_.set_block_allocator(ModulePageAllocator(allocator_)); + } int32_t session_id_; common::ObArenaAllocator allocator_; common::ObArray partition_table_array_; TO_STRING_KV(K_(session_id), K_(partition_table_array)); }; ObTableLoadTransCtx *const trans_ctx_; - common::ObSEArray session_store_array_; + common::ObArray session_store_array_; }; class ObTableLoadTransStoreWriter diff --git a/src/share/table/ob_table_load_dml_stat.h b/src/share/table/ob_table_load_dml_stat.h index f208b1e958..07e45f0396 100644 --- a/src/share/table/ob_table_load_dml_stat.h +++ b/src/share/table/ob_table_load_dml_stat.h @@ -25,7 +25,11 @@ namespace table struct ObTableLoadDmlStat { public: - ObTableLoadDmlStat() : allocator_("TLD_Dmlstat") { allocator_.set_tenant_id(MTL_ID()); } + ObTableLoadDmlStat() : allocator_("TLD_Dmlstat") + { + dml_stat_array_.set_tenant_id(MTL_ID()); + allocator_.set_tenant_id(MTL_ID()); + } ~ObTableLoadDmlStat() { reset(); } void reset() { @@ -62,7 +66,7 @@ public: } TO_STRING_KV(K_(dml_stat_array)); public: - common::ObSEArray dml_stat_array_; + common::ObArray dml_stat_array_; common::ObArenaAllocator allocator_; }; diff --git a/src/share/table/ob_table_load_sql_statistics.h b/src/share/table/ob_table_load_sql_statistics.h index 6cfd7d93e2..9eb963bf85 100644 --- a/src/share/table/ob_table_load_sql_statistics.h +++ b/src/share/table/ob_table_load_sql_statistics.h @@ -25,7 +25,12 @@ struct ObTableLoadSqlStatistics { OB_UNIS_VERSION(1); public: - ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") { allocator_.set_tenant_id(MTL_ID()); } + ObTableLoadSqlStatistics() : allocator_("TLD_Opstat") + { + table_stat_array_.set_tenant_id(MTL_ID()); + col_stat_array_.set_tenant_id(MTL_ID()); + allocator_.set_tenant_id(MTL_ID()); + } ~ObTableLoadSqlStatistics() { reset(); } void reset(); bool is_empty() const @@ -40,8 +45,8 @@ public: int persistence_col_stats(); TO_STRING_KV(K_(col_stat_array), K_(table_stat_array)); public: - common::ObSEArray table_stat_array_; - common::ObSEArray col_stat_array_; + common::ObArray table_stat_array_; + common::ObArray col_stat_array_; common::ObArenaAllocator allocator_; }; diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 81ffe18e7b..4052ce484b 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -67,6 +67,7 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam() ignore_row_num_(-1), dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE) { + store_column_idxs_.set_tenant_id(MTL_ID()); } bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const @@ -233,6 +234,7 @@ int ObLoadDataDirectImpl::Logger::log_error_line(const ObString &file_name, int6 ObLoadDataDirectImpl::DataDescIterator::DataDescIterator() : pos_(0) { + data_descs_.set_tenant_id(MTL_ID()); } ObLoadDataDirectImpl::DataDescIterator::~DataDescIterator() @@ -443,6 +445,7 @@ ObLoadDataDirectImpl::DataReader::DataReader() is_iter_end_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObLoadDataDirectImpl::DataReader::~DataReader() @@ -901,6 +904,7 @@ ObLoadDataDirectImpl::FileLoadExecutor::FileLoadExecutor() total_line_count_(0), is_inited_(false) { + handle_resource_.set_tenant_id(MTL_ID()); } ObLoadDataDirectImpl::FileLoadExecutor::~FileLoadExecutor() @@ -1973,7 +1977,8 @@ int ObLoadDataDirectImpl::init_store_column_idxs(ObIArray &store_column const uint64_t table_id = load_args.table_id_; ObSchemaGetterGuard schema_guard; const ObTableSchema *table_schema = nullptr; - ObSEArray column_descs; + ObArray column_descs; + column_descs.set_tenant_id(MTL_ID()); if (OB_FAIL(ObMultiVersionSchemaService::get_instance().get_tenant_schema_guard(tenant_id, schema_guard))) { LOG_WARN("fail to get tenant schema guard", KR(ret), K(tenant_id)); diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index b91dedaf58..a77c7092bf 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -96,8 +96,7 @@ private: int64_t ignore_row_num_; // number of rows to ignore per file sql::ObLoadDupActionType dup_action_; DataAccessParam data_access_param_; - common::ObSEArray - store_column_idxs_; // Mapping of stored columns to source data columns + common::ObArray store_column_idxs_; // Mapping of stored columns to source data columns }; struct LoadExecuteContext @@ -166,7 +165,7 @@ private: int get_next_data_desc(DataDesc &data_desc, int64_t &pos); TO_STRING_KV(K_(data_descs), K_(pos)); private: - common::ObSEArray data_descs_; + common::ObArray data_descs_; int64_t pos_; }; @@ -359,7 +358,7 @@ private: // task ctrl ObParallelTaskController task_controller_; ObConcurrentFixedCircularArray handle_reserve_queue_; - common::ObSEArray handle_resource_; // 用于释放资源 + common::ObArray handle_resource_; // 用于释放资源 int64_t total_line_count_; bool is_inited_; private: diff --git a/src/storage/direct_load/ob_direct_load_data_fuse.cpp b/src/storage/direct_load/ob_direct_load_data_fuse.cpp index f240a8da09..0cb6956f59 100644 --- a/src/storage/direct_load/ob_direct_load_data_fuse.cpp +++ b/src/storage/direct_load/ob_direct_load_data_fuse.cpp @@ -286,6 +286,7 @@ int ObDirectLoadDataFuse::get_next_row(const ObDatumRow *&datum_row) ObDirectLoadSSTableDataFuse::ObDirectLoadSSTableDataFuse() : allocator_("TLD_DataFuse"), origin_iter_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTableDataFuse::~ObDirectLoadSSTableDataFuse() @@ -310,7 +311,6 @@ int ObDirectLoadSSTableDataFuse::init(const ObDirectLoadDataFuseParam ¶m, ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param), KP(origin_table), K(range)); } else { - allocator_.set_tenant_id(MTL_ID()); // construct iters if (OB_FAIL(origin_table->scan(range, allocator_, origin_iter_))) { LOG_WARN("fail to scan origin table", KR(ret)); @@ -354,6 +354,7 @@ int ObDirectLoadSSTableDataFuse::get_next_row(const ObDatumRow *&datum_row) ObDirectLoadMultipleSSTableDataFuse::ObDirectLoadMultipleSSTableDataFuse() : allocator_("TLD_DataFuse"), origin_iter_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableDataFuse::~ObDirectLoadMultipleSSTableDataFuse() @@ -379,7 +380,6 @@ int ObDirectLoadMultipleSSTableDataFuse::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid argument", K(ret), K(param), KP(origin_table), K(range)); } else { - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(range_.assign(param.tablet_id_, range))) { LOG_WARN("fail to assign range", KR(ret)); } diff --git a/src/storage/direct_load/ob_direct_load_external_fragment.cpp b/src/storage/direct_load/ob_direct_load_external_fragment.cpp index 37e9c58bf8..0ac51839fd 100644 --- a/src/storage/direct_load/ob_direct_load_external_fragment.cpp +++ b/src/storage/direct_load/ob_direct_load_external_fragment.cpp @@ -65,6 +65,7 @@ int ObDirectLoadExternalFragment::assign(const ObDirectLoadExternalFragment &oth ObDirectLoadExternalFragmentArray::ObDirectLoadExternalFragmentArray() { + fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadExternalFragmentArray::~ObDirectLoadExternalFragmentArray() diff --git a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp index 0b3de3a7ee..1676bd38ac 100644 --- a/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp +++ b/src/storage/direct_load/ob_direct_load_external_multi_partition_table.cpp @@ -57,6 +57,7 @@ ObDirectLoadExternalMultiPartitionTableBuilder::ObDirectLoadExternalMultiPartiti is_closed_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadExternalMultiPartitionTableBuilder::~ObDirectLoadExternalMultiPartitionTableBuilder() @@ -75,7 +76,6 @@ int ObDirectLoadExternalMultiPartitionTableBuilder::init( LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(alloc_tmp_file())) { LOG_WARN("fail to alloc tmp file", KR(ret)); } else if (OB_FAIL(external_writer_.init(param_.table_data_desc_.external_data_block_size_, diff --git a/src/storage/direct_load/ob_direct_load_external_scanner.h b/src/storage/direct_load/ob_direct_load_external_scanner.h index 01c229eb42..a7c2399924 100644 --- a/src/storage/direct_load/ob_direct_load_external_scanner.h +++ b/src/storage/direct_load/ob_direct_load_external_scanner.h @@ -162,7 +162,7 @@ public: void reuse(); private: common::ObArenaAllocator allocator_; - common::ObSEArray iters_; + common::ObArray iters_; ObDirectLoadExternalMerger merger_; bool is_inited_; }; @@ -171,6 +171,8 @@ template ObDirectLoadExternalSortScanner::ObDirectLoadExternalSortScanner() : allocator_("TLD_ESScanner"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + iters_.set_tenant_id(MTL_ID()); } template @@ -209,7 +211,6 @@ int ObDirectLoadExternalSortScanner::init( STORAGE_LOG(WARN, "invalid argument", KR(ret), K(compressor_type), K(fragments), KP(compare)); } else { - allocator_.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && i < fragments.count(); ++i) { const ObDirectLoadExternalFragment &fragment = fragments.at(i); ExternalReader *reader = nullptr; diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp index 24ba72aef8..3be315ca74 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table.cpp @@ -45,6 +45,8 @@ bool ObDirectLoadFastHeapTableCreateParam::is_valid() const ObDirectLoadFastHeapTable::ObDirectLoadFastHeapTable() : allocator_("TLD_FastHTable"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + column_stat_array_.set_tenant_id(MTL_ID()); } ObDirectLoadFastHeapTable::~ObDirectLoadFastHeapTable() @@ -94,7 +96,6 @@ int ObDirectLoadFastHeapTable::init(const ObDirectLoadFastHeapTableCreateParam & } else { meta_.tablet_id_ = param.tablet_id_; meta_.row_count_ = param.row_count_; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(copy_col_stat(param))){ LOG_WARN("fail to inner init", KR(ret), K(param)); } else { diff --git a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp index 6042d9e87d..e915109efb 100644 --- a/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_fast_heap_table_builder.cpp @@ -137,6 +137,8 @@ ObDirectLoadFastHeapTableBuilder::ObDirectLoadFastHeapTableBuilder() is_closed_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + column_stat_array_.set_tenant_id(MTL_ID()); } ObDirectLoadFastHeapTableBuilder::~ObDirectLoadFastHeapTableBuilder() @@ -197,7 +199,6 @@ int ObDirectLoadFastHeapTableBuilder::init(const ObDirectLoadFastHeapTableBuildP } else { bool has_lob_storage = param.lob_column_cnt_ > 0? true :false; param_ = param; - allocator_.set_tenant_id(MTL_ID()); if (param.online_opt_stat_gather_ && OB_FAIL(init_sql_statistics())) { LOG_WARN("fail to inner init sql statistics", KR(ret)); } else if (OB_FAIL(param_.insert_table_ctx_->get_tablet_context( diff --git a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp index 534d6334b9..fcd67e480a 100644 --- a/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_insert_table_ctx.cpp @@ -489,7 +489,11 @@ int ObDirectLoadInsertTabletContext::cancel() * ObDirectLoadInsertTableContext */ -ObDirectLoadInsertTableContext::ObDirectLoadInsertTableContext() : is_inited_(false) {} +ObDirectLoadInsertTableContext::ObDirectLoadInsertTableContext() + : allocator_("TLD_InsertTbl"), is_inited_(false) +{ + allocator_.set_tenant_id(MTL_ID()); +} ObDirectLoadInsertTableContext::~ObDirectLoadInsertTableContext() { destory(); } diff --git a/src/storage/direct_load/ob_direct_load_mem_chunk.h b/src/storage/direct_load/ob_direct_load_mem_chunk.h index 778d798d51..019fcbed7e 100644 --- a/src/storage/direct_load/ob_direct_load_mem_chunk.h +++ b/src/storage/direct_load/ob_direct_load_mem_chunk.h @@ -124,6 +124,8 @@ ObDirectLoadMemChunk::ObDirectLoadMemChunk() allocator_("TLD_MemChunk"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + item_list_.set_attr(ObMemAttr(MTL_ID(), "TLD_MemChunk")); } template @@ -138,8 +140,6 @@ int ObDirectLoadMemChunk::init(uint64_t tenant_id, int64_t mem_limit STORAGE_LOG(WARN, "invalid argument", KR(ret), K(mem_limit)); } else { buf_mem_limit_ = mem_limit; - allocator_.set_tenant_id(tenant_id); - item_list_.set_attr(ObMemAttr(tenant_id, "TLD_MemChunk")); is_inited_ = true; } return ret; diff --git a/src/storage/direct_load/ob_direct_load_mem_context.cpp b/src/storage/direct_load/ob_direct_load_mem_context.cpp index 62c56bc8a1..8a58cb6573 100644 --- a/src/storage/direct_load/ob_direct_load_mem_context.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_context.cpp @@ -97,6 +97,7 @@ void ObDirectLoadMemContext::reset() has_error_ = false; ObArray loader_array; + loader_array.set_tenant_id(MTL_ID()); mem_loader_queue_.pop_all(loader_array); for (int64_t i = 0; i < loader_array.count(); i ++) { ObDirectLoadMemWorker *tmp = loader_array.at(i); @@ -106,6 +107,7 @@ void ObDirectLoadMemContext::reset() } ObArray chunk_array; + chunk_array.set_tenant_id(MTL_ID()); mem_chunk_queue_.pop_all(chunk_array); for (int64_t i = 0; i < chunk_array.count(); i ++) { ObDirectLoadExternalMultiPartitionRowChunk *chunk = chunk_array.at(i); @@ -144,7 +146,6 @@ ObDirectLoadMemContext::~ObDirectLoadMemContext() int ObDirectLoadMemContext::init() { int ret = OB_SUCCESS; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(mem_dump_queue_.init(1024))) { STORAGE_LOG(WARN, "fail to init mem dump queue", KR(ret)); } @@ -156,6 +157,7 @@ int ObDirectLoadMemContext::add_tables_from_table_builder(ObIDirectLoadPartition int ret = OB_SUCCESS; lib::ObMutexGuard guard(mutex_); ObArray table_array; + table_array.set_tenant_id(MTL_ID()); if (OB_SUCC(ret)) { if (OB_FAIL(builder.get_tables(table_array, allocator_))) { LOG_WARN("fail to get tables", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_mem_context.h b/src/storage/direct_load/ob_direct_load_mem_context.h index b172783157..fef64514e2 100644 --- a/src/storage/direct_load/ob_direct_load_mem_context.h +++ b/src/storage/direct_load/ob_direct_load_mem_context.h @@ -63,7 +63,11 @@ public: mem_dump_task_count_(0), running_dump_count_(0), allocator_("TLD_mem_ctx"), - has_error_(false) {} + has_error_(false) + { + allocator_.set_tenant_id(MTL_ID()); + tables_.set_tenant_id(MTL_ID()); + } ~ObDirectLoadMemContext(); diff --git a/src/storage/direct_load/ob_direct_load_mem_dump.cpp b/src/storage/direct_load/ob_direct_load_mem_dump.cpp index c8515bd6c0..d31797f490 100644 --- a/src/storage/direct_load/ob_direct_load_mem_dump.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_dump.cpp @@ -38,6 +38,8 @@ ObDirectLoadMemDump::Context::Context() sub_dump_count_(0) { allocator_.set_tenant_id(MTL_ID()); + mem_chunk_array_.set_tenant_id(MTL_ID()); + all_tables_.set_tenant_id(MTL_ID()); } ObDirectLoadMemDump::Context::~Context() @@ -85,6 +87,7 @@ ObDirectLoadMemDump::ObDirectLoadMemDump(ObDirectLoadMemContext *mem_ctx, extra_buf_(nullptr), extra_buf_size_(0) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMemDump::~ObDirectLoadMemDump() {} @@ -218,7 +221,8 @@ int ObDirectLoadMemDump::dump_tables() ObIDirectLoadPartitionTableBuilder *table_builder = nullptr; - allocator_.set_tenant_id(MTL_ID()); + iters.set_tenant_id(MTL_ID()); + chunk_iters.set_tenant_id(MTL_ID()); extra_buf_size_ = mem_ctx_->table_data_desc_.extra_buf_size_; if (OB_ISNULL(extra_buf_ = static_cast(allocator_.alloc(extra_buf_size_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -369,6 +373,7 @@ int ObDirectLoadMemDump::compact_tables() { int ret = OB_SUCCESS; ObArray keys; + keys.set_tenant_id(MTL_ID()); if (OB_FAIL(context_ptr_->tables_.get_all_key(keys))) { LOG_WARN("fail to get all keys", KR(ret)); } @@ -386,6 +391,7 @@ int ObDirectLoadMemDump::compact_tablet_tables(const ObTabletID &tablet_id) ObIDirectLoadTabletTableCompactor *compactor = nullptr; ObArray> table_array; + table_array.set_tenant_id(MTL_ID()); if (OB_FAIL(context_ptr_->tables_.get(tablet_id, table_array))) { LOG_WARN("fail to get table array", K(tablet_id), KR(ret)); } else { diff --git a/src/storage/direct_load/ob_direct_load_mem_sample.cpp b/src/storage/direct_load/ob_direct_load_mem_sample.cpp index ec9edf7c4c..961987e804 100644 --- a/src/storage/direct_load/ob_direct_load_mem_sample.cpp +++ b/src/storage/direct_load/ob_direct_load_mem_sample.cpp @@ -34,6 +34,7 @@ int ObDirectLoadMemSample::gen_ranges(ObIArray &chunks, ObIArray sample_rows; + sample_rows.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && i < DEFAULT_SAMPLE_TIMES; i ++) { int idx = abs(rand()) % chunks.count(); ChunkType *chunk = chunks.at(idx); @@ -81,6 +82,8 @@ int ObDirectLoadMemSample::do_work() auto context_ptr = ObTableLoadHandle::make_handle(); context_ptr->sub_dump_count_ = range_count_; + chunks.set_tenant_id(MTL_ID()); + ranges.set_tenant_id(MTL_ID()); mem_ctx_->mem_chunk_queue_.pop_all(chunks); if (OB_FAIL(context_ptr->init())) { @@ -121,7 +124,8 @@ int ObDirectLoadMemSample::add_dump(int64_t idx, ObTableLoadHandle context_ptr) { int ret = OB_SUCCESS; - storage::ObDirectLoadMemDump *mem_dump = OB_NEW(ObDirectLoadMemDump, "TLD_mem_dump", mem_ctx_, range, context_ptr, idx); + storage::ObDirectLoadMemDump *mem_dump = OB_NEW( + ObDirectLoadMemDump, ObMemAttr(MTL_ID(), "TLD_mem_dump"), mem_ctx_, range, context_ptr, idx); if (mem_dump == nullptr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate mem dump", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp index 75a8b2d4ac..4436fe780b 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.cpp +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.cpp @@ -82,6 +82,8 @@ bool ObDirectLoadMergeParam::is_valid() const ObDirectLoadMergeCtx::ObDirectLoadMergeCtx() : allocator_("TLD_MergeCtx"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + tablet_merge_ctx_array_.set_tenant_id(MTL_ID()); } ObDirectLoadMergeCtx::~ObDirectLoadMergeCtx() @@ -109,7 +111,6 @@ int ObDirectLoadMergeCtx::init(const ObDirectLoadMergeParam ¶m, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(ls_partition_ids), K(target_ls_partition_ids)); } else { - allocator_.set_tenant_id(MTL_ID()); param_ = param; if (OB_FAIL(create_all_tablet_ctxs(ls_partition_ids, target_ls_partition_ids))) { LOG_WARN("fail to create all tablet ctxs", KR(ret)); @@ -159,6 +160,13 @@ int ObDirectLoadMergeCtx::create_all_tablet_ctxs( ObDirectLoadTabletMergeCtx::ObDirectLoadTabletMergeCtx() : allocator_("TLD_MegTbtCtx"), task_finish_count_(0), rescan_task_finish_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + sstable_array_.set_tenant_id(MTL_ID()); + multiple_sstable_array_.set_tenant_id(MTL_ID()); + multiple_heap_table_array_.set_tenant_id(MTL_ID()); + range_array_.set_tenant_id(MTL_ID()); + task_array_.set_tenant_id(MTL_ID()); + rescan_task_array_.set_tenant_id(MTL_ID()); } ObDirectLoadTabletMergeCtx::~ObDirectLoadTabletMergeCtx() @@ -199,7 +207,6 @@ int ObDirectLoadTabletMergeCtx::init(const ObDirectLoadMergeParam ¶m, if (OB_FAIL(origin_table_.init(origin_table_param))) { LOG_WARN("fail to init origin sstable", KR(ret)); } else { - allocator_.set_tenant_id(MTL_ID()); param_ = param; target_partition_id_ = target_ls_partition_id.part_tablet_id_.partition_id_; tablet_id_ = ls_partition_id.part_tablet_id_.tablet_id_; diff --git a/src/storage/direct_load/ob_direct_load_merge_ctx.h b/src/storage/direct_load/ob_direct_load_merge_ctx.h index 323b549df0..db3db98024 100644 --- a/src/storage/direct_load/ob_direct_load_merge_ctx.h +++ b/src/storage/direct_load/ob_direct_load_merge_ctx.h @@ -93,7 +93,7 @@ private: private: common::ObArenaAllocator allocator_; ObDirectLoadMergeParam param_; - common::ObSEArray tablet_merge_ctx_array_; + common::ObArray tablet_merge_ctx_array_; bool is_inited_; }; @@ -163,12 +163,12 @@ private: common::ObTabletID tablet_id_; common::ObTabletID target_tablet_id_; ObDirectLoadOriginTable origin_table_; - common::ObSEArray sstable_array_; - common::ObSEArray multiple_sstable_array_; - common::ObSEArray multiple_heap_table_array_; - common::ObSEArray range_array_; - common::ObSEArray task_array_; - common::ObSEArray rescan_task_array_; + common::ObArray sstable_array_; + common::ObArray multiple_sstable_array_; + common::ObArray multiple_heap_table_array_; + common::ObArray range_array_; + common::ObArray task_array_; + common::ObArray rescan_task_array_; int64_t task_finish_count_ CACHE_ALIGNED; int64_t rescan_task_finish_count_ CACHE_ALIGNED; bool is_inited_; diff --git a/src/storage/direct_load/ob_direct_load_multi_map.h b/src/storage/direct_load/ob_direct_load_multi_map.h index eef14b4347..1d67fa05df 100644 --- a/src/storage/direct_load/ob_direct_load_multi_map.h +++ b/src/storage/direct_load/ob_direct_load_multi_map.h @@ -51,8 +51,12 @@ public: ret = map_.get_refactored(key, bag); if (ret == common::OB_HASH_NOT_EXIST) { ret = OB_SUCCESS; - bag = OB_NEW(common::ObArray, "TLD_MM_bag", OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator("TLD_MM_bagi", MTL_ID())); - if (OB_FAIL(map_.set_refactored(key, bag))) { + bag = OB_NEW(common::ObArray, ObMemAttr(MTL_ID(), "TLD_MM_bag"), + OB_MALLOC_NORMAL_BLOCK_SIZE, ModulePageAllocator("TLD_MM_bagi", MTL_ID())); + if (OB_ISNULL(bag)) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to new bag", KR(ret)); + } else if (OB_FAIL(map_.set_refactored(key, bag))) { STORAGE_LOG(WARN, "fail to put bag", KR(ret)); } } else if (ret != OB_SUCCESS) { diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table.cpp index 9304ee9ae5..c123610b0b 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table.cpp @@ -64,6 +64,7 @@ ObDirectLoadMultipleHeapTableCreateParam::ObDirectLoadMultipleHeapTableCreatePar row_count_(0), max_data_block_size_(0) { + data_fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleHeapTableCreateParam::~ObDirectLoadMultipleHeapTableCreateParam() @@ -117,12 +118,13 @@ void ObDirectLoadMultipleHeapTableMeta::reset() } /** - * ObDirectLoadMultipleSSTable + * ObDirectLoadMultipleHeapTable */ ObDirectLoadMultipleHeapTable::ObDirectLoadMultipleHeapTable() : is_inited_(false) { + data_fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleHeapTable::~ObDirectLoadMultipleHeapTable() diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_compactor.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_compactor.cpp index d0dec93f09..e452cbf1f8 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_compactor.cpp @@ -64,6 +64,10 @@ ObDirectLoadMultipleHeapTableCompactor::ObDirectLoadMultipleHeapTableCompactor() is_stop_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + index_scanners_.set_tenant_id(MTL_ID()); + base_data_fragment_idxs_.set_tenant_id(MTL_ID()); + data_fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleHeapTableCompactor::~ObDirectLoadMultipleHeapTableCompactor() @@ -127,7 +131,6 @@ int ObDirectLoadMultipleHeapTableCompactor::init( LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; - allocator_.set_tenant_id(MTL_ID()); is_inited_ = true; } return ret; diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp index c04ce8863e..aa730fc2f0 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_index_scan_merge.cpp @@ -30,6 +30,7 @@ ObDirectLoadMultipleHeapTableIndexScanMerge::ObDirectLoadMultipleHeapTableIndexS rows_merger_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleHeapTableIndexScanMerge::~ObDirectLoadMultipleHeapTableIndexScanMerge() @@ -47,7 +48,6 @@ int ObDirectLoadMultipleHeapTableIndexScanMerge::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(scanners.count())); } else { - allocator_.set_tenant_id(MTL_ID()); if (scanners.count() > 1) { // init consumers if (OB_ISNULL(consumers_ = static_cast( diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_map.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_map.cpp index 997de813b5..e74fa8a89b 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_map.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_map.cpp @@ -27,12 +27,12 @@ ObDirectLoadMultipleHeapTableMap::ObDirectLoadMultipleHeapTableMap(int64_t mem_l allocator_("TLD_HT_map"), mem_limit_(mem_limit) { + allocator_.set_tenant_id(MTL_ID()); } int ObDirectLoadMultipleHeapTableMap::init() { int ret = OB_SUCCESS; - allocator_.set_tenant_id(MTL_ID()); ret = tablet_map_.init(); return ret; } diff --git a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp index 416d0a3ea2..422bf24f03 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_heap_table_sorter.cpp @@ -35,6 +35,7 @@ ObDirectLoadMultipleHeapTableSorter::ObDirectLoadMultipleHeapTableSorter( heap_table_array_(nullptr), heap_table_allocator_(nullptr) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleHeapTableSorter::~ObDirectLoadMultipleHeapTableSorter() @@ -44,7 +45,6 @@ ObDirectLoadMultipleHeapTableSorter::~ObDirectLoadMultipleHeapTableSorter() int ObDirectLoadMultipleHeapTableSorter::init() { int ret = OB_SUCCESS; - allocator_.set_tenant_id(MTL_ID()); if (OB_ISNULL(extra_buf_ = static_cast(allocator_.alloc(mem_ctx_->table_data_desc_.extra_buf_size_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate extra buf", KR(ret)); @@ -76,6 +76,7 @@ int ObDirectLoadMultipleHeapTableSorter::close_chunk(ObDirectLoadMultipleHeapTab ObArray keys; ObDirectLoadMultipleHeapTableBuilder table_builder; ObDirectLoadMultipleHeapTableBuildParam table_builder_param; + keys.set_tenant_id(MTL_ID()); table_builder_param.table_data_desc_ = mem_ctx_->table_data_desc_; table_builder_param.file_mgr_ = mem_ctx_->file_mgr_; table_builder_param.extra_buf_size_ = mem_ctx_->table_data_desc_.extra_buf_size_; @@ -103,6 +104,7 @@ int ObDirectLoadMultipleHeapTableSorter::close_chunk(ObDirectLoadMultipleHeapTab for (int64_t i = 0; OB_SUCC(ret) && i < keys.count(); i ++) { ObArray bag; + bag.set_tenant_id(MTL_ID()); if (OB_FAIL(chunk->get(keys.at(i), bag))) { LOG_WARN("fail to get bag", KR(ret)); } @@ -135,6 +137,7 @@ int ObDirectLoadMultipleHeapTableSorter::get_tables( { int ret = OB_SUCCESS; ObArray table_array; + table_array.set_tenant_id(MTL_ID()); if (OB_FAIL(table_builder.get_tables(table_array, *heap_table_allocator_))) { LOG_WARN("fail to get tables", KR(ret)); } diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp index bac003f42e..d144b2f225 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable.cpp @@ -72,6 +72,7 @@ ObDirectLoadMultipleSSTableCreateParam::ObDirectLoadMultipleSSTableCreateParam() row_count_(0), max_data_block_size_(0) { + fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableCreateParam::~ObDirectLoadMultipleSSTableCreateParam() @@ -126,6 +127,8 @@ void ObDirectLoadMultipleSSTableMeta::reset() ObDirectLoadMultipleSSTable::ObDirectLoadMultipleSSTable() : allocator_("TLD_MSSTable"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTable::~ObDirectLoadMultipleSSTable() @@ -151,7 +154,6 @@ int ObDirectLoadMultipleSSTable::init(const ObDirectLoadMultipleSSTableCreatePar ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param)); } else { - allocator_.set_tenant_id(MTL_ID()); meta_.rowkey_column_num_ = param.rowkey_column_num_; meta_.column_count_ = param.column_count_; meta_.index_block_size_ = param.index_block_size_; @@ -186,7 +188,6 @@ int ObDirectLoadMultipleSSTable::copy(const ObDirectLoadMultipleSSTable &other) LOG_WARN("invalid args", KR(ret), K(other)); } else { reset(); - allocator_.set_tenant_id(MTL_ID()); meta_ = other.meta_; if (OB_FAIL(start_key_.deep_copy(other.start_key_, allocator_))) { LOG_WARN("fail to deep copy rowkey", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp index 91c31a44f2..a75f1a4cfa 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_builder.cpp @@ -99,6 +99,8 @@ ObDirectLoadMultipleSSTableBuilder::ObDirectLoadMultipleSSTableBuilder() is_closed_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + last_rowkey_allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableBuilder::~ObDirectLoadMultipleSSTableBuilder() @@ -116,8 +118,6 @@ int ObDirectLoadMultipleSSTableBuilder::init(const ObDirectLoadMultipleSSTableBu LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; - allocator_.set_tenant_id(MTL_ID()); - last_rowkey_allocator_.set_tenant_id(MTL_ID()); int64_t dir_id = -1; if (OB_FAIL(param_.file_mgr_->alloc_dir(dir_id))) { LOG_WARN("fail to alloc dir", KR(ret)); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp index 1375a1368d..995d4b77ac 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_compactor.cpp @@ -51,6 +51,9 @@ ObDirectLoadMultipleSSTableCompactor::ObDirectLoadMultipleSSTableCompactor() end_key_allocator_("TLD_ERowkey"), is_inited_(false) { + start_key_allocator_.set_tenant_id(MTL_ID()); + end_key_allocator_.set_tenant_id(MTL_ID()); + fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableCompactor::~ObDirectLoadMultipleSSTableCompactor() @@ -68,8 +71,6 @@ int ObDirectLoadMultipleSSTableCompactor::init(const ObDirectLoadMultipleSSTable LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; - start_key_allocator_.set_tenant_id(MTL_ID()); - end_key_allocator_.set_tenant_id(MTL_ID()); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); is_inited_ = true; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp index 83b877c767..4ef0b631b1 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.cpp @@ -57,6 +57,9 @@ ObDirectLoadMultipleSSTableScanMerge::ObDirectLoadMultipleSSTableScanMerge() rows_merger_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + scanners_.set_tenant_id(MTL_ID()); + rows_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableScanMerge::~ObDirectLoadMultipleSSTableScanMerge() { reset(); } @@ -100,7 +103,6 @@ int ObDirectLoadMultipleSSTableScanMerge::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(sstable_array), K(range)); } else { - allocator_.set_tenant_id(MTL_ID()); // construct scanners for (int64_t i = 0; OB_SUCC(ret) && i < sstable_array.count(); ++i) { ObDirectLoadMultipleSSTable *sstable = sstable_array.at(i); diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h index 8e5efc8447..63e0bc49a8 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scan_merge.h @@ -71,7 +71,7 @@ private: const blocksstable::ObStorageDatumUtils *datum_utils_; ObDirectLoadDMLRowHandler *dml_row_handler_; const ObDirectLoadMultipleDatumRange *range_; - common::ObSEArray scanners_; + common::ObArray scanners_; int64_t *consumers_; int64_t consumer_cnt_; LoserTreeCompare compare_; diff --git a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp index c5f3013132..ba68bbc630 100644 --- a/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_multiple_sstable_scanner.cpp @@ -31,6 +31,7 @@ ObDirectLoadMultipleSSTableScanner::ObDirectLoadMultipleSSTableScanner() is_iter_end_(false), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableScanner::~ObDirectLoadMultipleSSTableScanner() @@ -56,7 +57,6 @@ int ObDirectLoadMultipleSSTableScanner::init(ObDirectLoadMultipleSSTable *sstabl table_data_desc_ = table_data_desc; range_ = ⦥ datum_utils_ = datum_utils; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(data_block_scanner_.init(sstable, table_data_desc, range, datum_utils))) { LOG_WARN("fail to init data block scanner", KR(ret)); } else if (OB_FAIL(data_block_reader_.init(table_data_desc.sstable_data_block_size_, diff --git a/src/storage/direct_load/ob_direct_load_origin_table.cpp b/src/storage/direct_load/ob_direct_load_origin_table.cpp index b06ca36814..8a3788d8c0 100644 --- a/src/storage/direct_load/ob_direct_load_origin_table.cpp +++ b/src/storage/direct_load/ob_direct_load_origin_table.cpp @@ -64,6 +64,7 @@ ObDirectLoadOriginTableMeta::~ObDirectLoadOriginTableMeta() ObDirectLoadOriginTable::ObDirectLoadOriginTable() : major_sstable_(nullptr), is_inited_(false) { + ddl_sstables_.set_tenant_id(MTL_ID()); } ObDirectLoadOriginTable::~ObDirectLoadOriginTable() @@ -195,6 +196,8 @@ int ObDirectLoadOriginTable::scan(const ObDatumRange &key_range, ObDirectLoadOriginTableScanner::ObDirectLoadOriginTableScanner() : allocator_("TLD_OriSSTScan"), origin_table_(nullptr), schema_param_(allocator_), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + col_ids_.set_tenant_id(MTL_ID()); } ObDirectLoadOriginTableScanner::~ObDirectLoadOriginTableScanner() @@ -214,7 +217,6 @@ int ObDirectLoadOriginTableScanner::init(ObDirectLoadOriginTable *origin_table, LOG_WARN("Invalid argument", KR(ret), KPC(origin_table), K(query_range)); } else { origin_table_ = origin_table; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL((init_table_access_param()))) { LOG_WARN("fail to init query range", KR(ret)); } else if (OB_FAIL(init_table_access_ctx())) { diff --git a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp index 4d790462de..3835b31e8f 100644 --- a/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp +++ b/src/storage/direct_load/ob_direct_load_partition_merge_task.cpp @@ -43,6 +43,7 @@ ObDirectLoadPartitionMergeTask::ObDirectLoadPartitionMergeTask() is_inited_(false) { allocator_.set_tenant_id(MTL_ID()); + column_stat_array_.set_tenant_id(MTL_ID()); } ObDirectLoadPartitionMergeTask::~ObDirectLoadPartitionMergeTask() @@ -891,6 +892,7 @@ ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::RowIterat dml_row_handler_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::~RowIterator() @@ -920,7 +922,6 @@ int ObDirectLoadPartitionHeapTableMultipleAggregateMergeTask::RowIterator::init( LOG_WARN("invalid args", KR(ret), K(merge_param), K(tablet_id), KP(origin_table), KP(heap_table_array)); } else { - allocator_.set_tenant_id(MTL_ID()); range_.set_whole_range(); // init row iterator ObDirectLoadInsertTableRowIteratorParam row_iterator_param; diff --git a/src/storage/direct_load/ob_direct_load_range_splitter.cpp b/src/storage/direct_load/ob_direct_load_range_splitter.cpp index e197bbd077..0884a36d71 100644 --- a/src/storage/direct_load/ob_direct_load_range_splitter.cpp +++ b/src/storage/direct_load/ob_direct_load_range_splitter.cpp @@ -423,6 +423,8 @@ int ObDirectLoadRowkeyMergeRangeSplitter::split_range(ObIArray &ra ObDirectLoadSSTableRangeSplitter::ObDirectLoadSSTableRangeSplitter() : allocator_("TLD_SSTRGSplit"), total_block_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_iters_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTableRangeSplitter::~ObDirectLoadSSTableRangeSplitter() @@ -446,7 +448,6 @@ int ObDirectLoadSSTableRangeSplitter::init(const ObIArray ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(sstable_array), KP(datum_utils)); } else { - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(construct_rowkey_iters(sstable_array))) { LOG_WARN("fail to construct rowkey itres", KR(ret)); } else if (OB_FAIL( @@ -505,6 +506,8 @@ int ObDirectLoadSSTableRangeSplitter::split_range(ObIArray &range_ ObDirectLoadMergeRangeSplitter::ObDirectLoadMergeRangeSplitter() : allocator_("TLD_MegRGSplit"), total_block_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_iters_.set_tenant_id(MTL_ID()); } ObDirectLoadMergeRangeSplitter::~ObDirectLoadMergeRangeSplitter() @@ -531,7 +534,6 @@ int ObDirectLoadMergeRangeSplitter::init(ObDirectLoadOriginTable *origin_table, ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), KPC(origin_table), K(sstable_array), KP(datum_utils)); } else { - allocator_.set_tenant_id(MTL_ID()); scan_range_.set_whole_range(); if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters( origin_table, scan_range_, allocator_, total_block_count_, rowkey_iters_))) { @@ -597,6 +599,8 @@ int ObDirectLoadMergeRangeSplitter::split_range(ObIArray &range_ar ObDirectLoadMultipleMergeTabletRangeSplitter::ObDirectLoadMultipleMergeTabletRangeSplitter() : allocator_("TLD_MulMegTRS"), total_block_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_iters_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleMergeTabletRangeSplitter::~ObDirectLoadMultipleMergeTabletRangeSplitter() @@ -629,7 +633,6 @@ int ObDirectLoadMultipleMergeTabletRangeSplitter::init( K(table_data_desc), KP(datum_utils)); } else { tablet_id_ = tablet_id; - allocator_.set_tenant_id(MTL_ID()); scan_range_.set_whole_range(); if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters( origin_table, scan_range_, allocator_, total_block_count_, rowkey_iters_))) { @@ -739,6 +742,8 @@ ObDirectLoadMultipleMergeRangeSplitter::ObDirectLoadMultipleMergeRangeSplitter() last_rowkey_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_iters_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleMergeRangeSplitter::~ObDirectLoadMultipleMergeRangeSplitter() @@ -766,7 +771,6 @@ int ObDirectLoadMultipleMergeRangeSplitter::init( ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(sstable_array), K(table_data_desc), KP(datum_utils)); } else { - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(construct_rowkey_iters(sstable_array, table_data_desc, datum_utils))) { LOG_WARN("fail to construct sstable rowkey itres", KR(ret)); } else if (OB_FAIL(compare_.init(*datum_utils))) { @@ -837,6 +841,7 @@ int ObDirectLoadMultipleMergeRangeSplitter::get_rowkeys_by_origin( const ObDatumRowkey *datum_rowkey = nullptr; ObDatumRowkey copied_rowkey; int64_t count = 0; + rowkey_iters.set_tenant_id(MTL_ID()); if (OB_FAIL(ObDirectLoadRangeSplitUtils::construct_origin_table_rowkey_iters( origin_table, scan_range, allocator, unused_total_block_count, rowkey_iters))) { LOG_WARN("fail to construct origin table rowkey iters", KR(ret)); @@ -1054,6 +1059,8 @@ int ObDirectLoadMultipleMergeRangeSplitter::split_range(ObTabletID &tablet_id, ObArray origin_rowkey_array; ObArray multiple_rowkey_array; tmp_allocator.set_tenant_id(MTL_ID()); + origin_rowkey_array.set_block_allocator(ModulePageAllocator(tmp_allocator)); + multiple_rowkey_array.set_block_allocator(ModulePageAllocator(tmp_allocator)); if (OB_FAIL(get_rowkeys_by_origin(origin_table, origin_rowkey_array, tmp_allocator))) { LOG_WARN("fail to get rowkeys by origin", KR(ret)); } else if (OB_FAIL(get_rowkeys_by_multiple(tablet_id, multiple_rowkey_array, tmp_allocator))) { @@ -1075,6 +1082,8 @@ int ObDirectLoadMultipleMergeRangeSplitter::split_range(ObTabletID &tablet_id, ObDirectLoadMultipleSSTableRangeSplitter::ObDirectLoadMultipleSSTableRangeSplitter() : allocator_("TLD_MulSSTRS"), datum_utils_(nullptr), total_block_count_(0), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_iters_.set_tenant_id(MTL_ID()); } ObDirectLoadMultipleSSTableRangeSplitter::~ObDirectLoadMultipleSSTableRangeSplitter() @@ -1102,7 +1111,6 @@ int ObDirectLoadMultipleSSTableRangeSplitter::init( LOG_WARN("invalid args", KR(ret), K(sstable_array), K(table_data_desc), KP(datum_utils)); } else { datum_utils_ = datum_utils; - allocator_.set_tenant_id(MTL_ID()); if (OB_FAIL(construct_rowkey_iters(sstable_array, table_data_desc, datum_utils))) { LOG_WARN("fail to construct rowkey iters", KR(ret)); } else if (OB_FAIL(compare_.init(*datum_utils))) { diff --git a/src/storage/direct_load/ob_direct_load_range_splitter.h b/src/storage/direct_load/ob_direct_load_range_splitter.h index 4e27d43542..a4b569342a 100644 --- a/src/storage/direct_load/ob_direct_load_range_splitter.h +++ b/src/storage/direct_load/ob_direct_load_range_splitter.h @@ -96,7 +96,7 @@ private: int construct_rowkey_iters(const common::ObIArray &sstable_array); private: common::ObArenaAllocator allocator_; - ObSEArray rowkey_iters_; + ObArray rowkey_iters_; int64_t total_block_count_; ObDirectLoadRowkeyMergeRangeSplitter rowkey_merge_splitter_; bool is_inited_; @@ -118,7 +118,7 @@ private: private: common::ObArenaAllocator allocator_; blocksstable::ObDatumRange scan_range_; - ObSEArray rowkey_iters_; + ObArray rowkey_iters_; int64_t total_block_count_; ObDirectLoadRowkeyMergeRangeSplitter rowkey_merge_splitter_; bool is_inited_; @@ -149,7 +149,7 @@ private: common::ObArenaAllocator allocator_; common::ObTabletID tablet_id_; blocksstable::ObDatumRange scan_range_; - ObSEArray rowkey_iters_; + ObArray rowkey_iters_; int64_t total_block_count_; ObDirectLoadRowkeyMergeRangeSplitter rowkey_merge_splitter_; bool is_inited_; @@ -223,7 +223,7 @@ private: private: common::ObArenaAllocator allocator_; const blocksstable::ObStorageDatumUtils *datum_utils_; - ObSEArray rowkey_iters_; + ObArray rowkey_iters_; int64_t total_block_count_; ObDirectLoadMultipleDatumRowkeyCompare compare_; RowkeyMerger rowkey_merger_; diff --git a/src/storage/direct_load/ob_direct_load_sstable.cpp b/src/storage/direct_load/ob_direct_load_sstable.cpp index ffdd5dc723..d2fdda00d0 100644 --- a/src/storage/direct_load/ob_direct_load_sstable.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable.cpp @@ -110,6 +110,8 @@ void ObDirectLoadSSTableMeta::reset() ObDirectLoadSSTable::ObDirectLoadSSTable() : allocator_("TLD_SSTable"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + fragments_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTable::~ObDirectLoadSSTable() {} @@ -131,7 +133,6 @@ int ObDirectLoadSSTable::init(ObDirectLoadSSTableCreateParam ¶m) ret = OB_INIT_TWICE; LOG_WARN("ObDirectLoadSSTable init twice", KR(ret), KP(this)); } else { - allocator_.set_tenant_id(MTL_ID()); meta_.tablet_id_ = param.tablet_id_; meta_.rowkey_column_count_ = param.rowkey_column_count_; meta_.column_count_ = param.column_count_; @@ -172,7 +173,6 @@ int ObDirectLoadSSTable::copy(const ObDirectLoadSSTable &other) LOG_WARN("invalid args", KR(ret), K(other)); } else { reset(); - allocator_.set_tenant_id(MTL_ID()); meta_ = other.meta_; if (meta_.row_count_ > 0) { if (OB_FAIL(other.start_key_.deep_copy(start_key_, allocator_))) { diff --git a/src/storage/direct_load/ob_direct_load_sstable.h b/src/storage/direct_load/ob_direct_load_sstable.h index 531278992f..27621a7bcd 100644 --- a/src/storage/direct_load/ob_direct_load_sstable.h +++ b/src/storage/direct_load/ob_direct_load_sstable.h @@ -100,6 +100,7 @@ public: index_block_count_(0), row_count_(0) { + fragments_.set_tenant_id(MTL_ID()); } bool is_valid() const { diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp index a25b556a9a..369e5df90c 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.cpp @@ -53,8 +53,6 @@ int ObDirectLoadSSTableBuilder::init(const ObDirectLoadSSTableBuildParam ¶m) } else { const uint64_t tenant_id = MTL_ID(); param_ = param; - allocator_.set_tenant_id(tenant_id); - rowkey_allocator_.set_tenant_id(tenant_id); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); int64_t dir_id = -1; @@ -269,6 +267,7 @@ ObDirectLoadDataBlockWriter2::ObDirectLoadDataBlockWriter2() is_inited_(false), is_closed_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadDataBlockWriter2::~ObDirectLoadDataBlockWriter2() { reset(); } @@ -305,7 +304,6 @@ int ObDirectLoadDataBlockWriter2::init(uint64_t tenant_id, int64_t buf_size, } else if (OB_FAIL(file_io_handle_.open(file_handle))) { LOG_WARN("fail to open file handle", KR(ret)); } else { - allocator_.set_tenant_id(tenant_id); if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); @@ -343,7 +341,8 @@ int ObDirectLoadDataBlockWriter2::write_large_item(const ObDirectLoadExternalRow int ret = OB_SUCCESS; char *new_buf; const int64_t align_buf_size = upper_align(new_buf_size, DIO_ALIGN_SIZE); - if (OB_ISNULL(new_buf = static_cast(ob_malloc(align_buf_size, ObModIds::OB_SQL_LOAD_DATA)))) { + ObMemAttr attr(MTL_ID(), "TLD_LargeBuf"); + if (OB_ISNULL(new_buf = static_cast(ob_malloc(align_buf_size, attr)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(align_buf_size)); } else { @@ -463,6 +462,7 @@ ObDirectLoadIndexBlockWriter::ObDirectLoadIndexBlockWriter() is_inited_(false), is_closed_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadIndexBlockWriter::~ObDirectLoadIndexBlockWriter() { reset(); } @@ -498,7 +498,6 @@ int ObDirectLoadIndexBlockWriter::init(uint64_t tenant_id, int64_t buf_size, } else if (OB_FAIL(file_io_handle_.open(file_handle))) { LOG_WARN("fail to open file handle", KR(ret)); } else { - allocator_.set_tenant_id(tenant_id); if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); @@ -608,6 +607,7 @@ ObDirectLoadIndexBlockReader::ObDirectLoadIndexBlockReader() allocator_("TLD_IBReader"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } int ObDirectLoadIndexBlockReader::init(uint64_t tenant_id, int64_t buf_size, @@ -623,7 +623,6 @@ int ObDirectLoadIndexBlockReader::init(uint64_t tenant_id, int64_t buf_size, } else if (OB_FAIL(file_io_handle_.open(file_handle))) { LOG_WARN("fail to open file handle", KR(ret)); } else { - allocator_.set_tenant_id(tenant_id); if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); diff --git a/src/storage/direct_load/ob_direct_load_sstable_builder.h b/src/storage/direct_load/ob_direct_load_sstable_builder.h index 14aa0e9706..ba9dddb9a1 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_builder.h +++ b/src/storage/direct_load/ob_direct_load_sstable_builder.h @@ -162,8 +162,14 @@ private: class ObDirectLoadSSTableBuilder : public ObIDirectLoadPartitionTableBuilder { public: - ObDirectLoadSSTableBuilder() : allocator_("TLD_sstablebdr"), is_closed_(false), is_inited_(false) + ObDirectLoadSSTableBuilder() + : allocator_("TLD_sstablebdr"), + rowkey_allocator_("TLD_Rowkey"), + is_closed_(false), + is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + rowkey_allocator_.set_tenant_id(MTL_ID()); } virtual ~ObDirectLoadSSTableBuilder() = default; int init(const ObDirectLoadSSTableBuildParam ¶m); diff --git a/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp b/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp index 69b24673ce..e9faf7ff37 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_compactor.cpp @@ -50,6 +50,9 @@ ObDirectLoadSSTableCompactor::ObDirectLoadSSTableCompactor() end_key_allocator_("TLD_ERowkey"), is_inited_(false) { + fragments_.set_tenant_id(MTL_ID()); + start_key_allocator_.set_tenant_id(MTL_ID()); + end_key_allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTableCompactor::~ObDirectLoadSSTableCompactor() @@ -67,8 +70,6 @@ int ObDirectLoadSSTableCompactor::init(const ObDirectLoadSSTableCompactParam &pa LOG_WARN("invalid args", KR(ret), K(param)); } else { param_ = param; - start_key_allocator_.set_tenant_id(MTL_ID()); - end_key_allocator_.set_tenant_id(MTL_ID()); start_key_.set_min_rowkey(); end_key_.set_min_rowkey(); is_inited_ = true; diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp index c4935040b0..48af4a83a3 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.cpp @@ -59,6 +59,9 @@ ObDirectLoadSSTableScanMerge::ObDirectLoadSSTableScanMerge() rows_merger_(nullptr), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); + scanners_.set_tenant_id(MTL_ID()); + rows_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTableScanMerge::~ObDirectLoadSSTableScanMerge() @@ -105,7 +108,6 @@ int ObDirectLoadSSTableScanMerge::init(const ObDirectLoadSSTableScanMergeParam & ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(sstable_array), K(range)); } else { - allocator_.set_tenant_id(MTL_ID()); // construct scanners for (int64_t i = 0; OB_SUCC(ret) && i < sstable_array.count(); ++i) { ObDirectLoadSSTable *sstable = sstable_array.at(i); diff --git a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h index 5ac37e5989..8c13829a92 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h +++ b/src/storage/direct_load/ob_direct_load_sstable_scan_merge.h @@ -75,7 +75,7 @@ private: const blocksstable::ObStorageDatumUtils *datum_utils_; ObDirectLoadDMLRowHandler *dml_row_handler_; const blocksstable::ObDatumRange *range_; - common::ObSEArray scanners_; + common::ObArray scanners_; int64_t *consumers_; int64_t consumer_cnt_; LoserTreeCompare compare_; diff --git a/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp b/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp index 1181bb29f2..473f25bb8d 100644 --- a/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp +++ b/src/storage/direct_load/ob_direct_load_sstable_scanner.cpp @@ -48,6 +48,7 @@ ObDirectLoadSSTableScanner::ObDirectLoadSSTableScanner() allocator_("TLD_SSTScanner"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadSSTableScanner::~ObDirectLoadSSTableScanner() {} @@ -75,7 +76,6 @@ int ObDirectLoadSSTableScanner::init(ObDirectLoadSSTable *sstable, sstable_ = sstable; query_range_ = ⦥ datum_utils_ = datum_utils; - allocator_.set_tenant_id(MTL_ID()); const int64_t buf_size = sstable_data_block_size_; if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -384,7 +384,8 @@ int ObDirectLoadSSTableScanner::read_buffer(uint64_t offset, uint64_t size) int64_t read_size = size; if (large_buf_ == nullptr) { int64_t large_buf_size = OB_SERVER_BLOCK_MGR.get_macro_block_size(); - if (OB_ISNULL(large_buf_ = static_cast(ob_malloc(large_buf_size, ObModIds::OB_SQL_LOAD_DATA)))) { + ObMemAttr attr(MTL_ID(), "TLD_LargeBuf"); + if (OB_ISNULL(large_buf_ = static_cast(ob_malloc(large_buf_size, attr)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(large_buf_size)); } @@ -399,7 +400,8 @@ int ObDirectLoadSSTableScanner::read_buffer(uint64_t offset, uint64_t size) int ObDirectLoadSSTableScanner::get_large_buffer(int64_t buf_size) { int ret = OB_SUCCESS; - if (OB_ISNULL(large_buf_ = static_cast(ob_malloc(buf_size, ObModIds::OB_SQL_LOAD_DATA)))) { + ObMemAttr attr(MTL_ID(), "TLD_LargeBuf"); + if (OB_ISNULL(large_buf_ = static_cast(ob_malloc(buf_size, attr)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); } else { @@ -575,6 +577,7 @@ ObDirectLoadIndexBlockMetaIterator::ObDirectLoadIndexBlockMetaIterator() allocator_("TLD_IDBMeta"), is_inited_(false) { + allocator_.set_tenant_id(MTL_ID()); } ObDirectLoadIndexBlockMetaIterator::~ObDirectLoadIndexBlockMetaIterator() {} @@ -614,7 +617,6 @@ int ObDirectLoadIndexBlockMetaIterator::init(ObDirectLoadSSTable *sstable) } if (OB_SUCC(ret)) { int64_t buf_size = OB_SERVER_BLOCK_MGR.get_macro_block_size(); - allocator_.set_tenant_id(tenant_id); if (OB_ISNULL(buf_ = static_cast(allocator_.alloc(buf_size)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); diff --git a/src/storage/direct_load/ob_direct_load_table_builder_allocator.h b/src/storage/direct_load/ob_direct_load_table_builder_allocator.h index 3b25f73f7e..c88af6622f 100644 --- a/src/storage/direct_load/ob_direct_load_table_builder_allocator.h +++ b/src/storage/direct_load/ob_direct_load_table_builder_allocator.h @@ -51,9 +51,7 @@ public: assert_in_own_thread(); T *t = nullptr; void *buf = nullptr; - ObMemAttr attr; - attr.label_ = "TLD_TB_Alloc"; - attr.tenant_id_ = MTL_ID(); + ObMemAttr attr(MTL_ID(), "TLD_TB_Alloc"); if (OB_NOT_NULL(buf = ob_malloc(sizeof(Item) + sizeof(T), attr))) { Item *item = new (buf) Item; t = new (item->buf_) T(args...); diff --git a/src/storage/direct_load/ob_direct_load_table_store.cpp b/src/storage/direct_load/ob_direct_load_table_store.cpp index 84eb928349..df890df392 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.cpp +++ b/src/storage/direct_load/ob_direct_load_table_store.cpp @@ -241,7 +241,6 @@ int ObDirectLoadTableStore::init(const ObDirectLoadTableStoreParam ¶m) } else { const uint64_t tenant_id = MTL_ID(); param_ = param; - allocator_.set_tenant_id(tenant_id); if (OB_FAIL(tablet_index_.create(64, "TLD_TS_PartMap", "TLD_TS_PartMap", tenant_id))) { LOG_WARN("fail to create hashmap", KR(ret)); } else { @@ -349,7 +348,8 @@ int ObDirectLoadTableStore::get_tables(ObIArray & LOG_WARN("ObDirectLoadTableStore not init", KR(ret), KP(this)); } else { table_array.reset(); - ObSEArray bucket_table_array; + ObArray bucket_table_array; + bucket_table_array.set_tenant_id(MTL_ID()); for (int64_t i = 0; OB_SUCC(ret) && i < bucket_ptr_array_.count(); ++i) { bucket_table_array.reset(); if (OB_FAIL(bucket_ptr_array_.at(i)->get_tables(bucket_table_array, allocator))) { diff --git a/src/storage/direct_load/ob_direct_load_table_store.h b/src/storage/direct_load/ob_direct_load_table_store.h index b2a6fde2a7..13b50da147 100644 --- a/src/storage/direct_load/ob_direct_load_table_store.h +++ b/src/storage/direct_load/ob_direct_load_table_store.h @@ -78,7 +78,11 @@ class ObDirectLoadTableStore { public: const static constexpr int64_t MAX_BUCKET_CNT = 1024; - ObDirectLoadTableStore() : allocator_("TLD_TSBucket"), is_inited_(false) {} + ObDirectLoadTableStore() : allocator_("TLD_TSBucket"), is_inited_(false) + { + allocator_.set_tenant_id(MTL_ID()); + bucket_ptr_array_.set_tenant_id(MTL_ID()); + } ~ObDirectLoadTableStore(); int init(const ObDirectLoadTableStoreParam ¶m); int append_row(const common::ObTabletID &tablet_id, const table::ObTableLoadSequenceNo &seq_no, const blocksstable::ObDatumRow &datum_row); diff --git a/src/storage/direct_load/ob_direct_load_tmp_file.cpp b/src/storage/direct_load/ob_direct_load_tmp_file.cpp index 52e4787a1e..694808317c 100644 --- a/src/storage/direct_load/ob_direct_load_tmp_file.cpp +++ b/src/storage/direct_load/ob_direct_load_tmp_file.cpp @@ -74,6 +74,7 @@ int ObDirectLoadTmpFileHandle::set_file(ObDirectLoadTmpFile *tmp_file) ObDirectLoadTmpFilesHandle::ObDirectLoadTmpFilesHandle() { + tmp_file_list_.set_tenant_id(MTL_ID()); } ObDirectLoadTmpFilesHandle::~ObDirectLoadTmpFilesHandle()