From 5d11cc123e0360cdbb9bbb9050949e4e24c90004 Mon Sep 17 00:00:00 2001 From: Fengjingkun Date: Mon, 6 Nov 2023 05:39:31 +0000 Subject: [PATCH] optimize cg merge info memory alloc --- .../index_block/ob_index_block_builder.h | 1 + src/storage/column_store/ob_co_merge_ctx.cpp | 27 +++++- src/storage/column_store/ob_co_merge_ctx.h | 5 + src/storage/column_store/ob_co_merge_dag.cpp | 93 +++++++++++-------- src/storage/column_store/ob_co_merge_dag.h | 29 +++--- 5 files changed, 103 insertions(+), 52 deletions(-) diff --git a/src/storage/blocksstable/index_block/ob_index_block_builder.h b/src/storage/blocksstable/index_block/ob_index_block_builder.h index ca0dee736a..c2c358945b 100644 --- a/src/storage/blocksstable/index_block/ob_index_block_builder.h +++ b/src/storage/blocksstable/index_block/ob_index_block_builder.h @@ -496,6 +496,7 @@ public: const int64_t nested_size = OB_DEFAULT_MACRO_BLOCK_SIZE, const int64_t nested_offset = 0); int init_meta_iter(ObMacroMetaIter &iter); + bool is_inited() const { return is_inited_; } TO_STRING_KV(K(roots_.count())); public: static bool check_version_for_small_sstable(const ObDataStoreDesc &index_desc); diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index 0fb7a38555..f012c80dc5 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -211,7 +211,7 @@ int ObCOTabletMergeCtx::prepare_index_builder( ret = OB_ERR_UNEXPECTED; LOG_WARN("schema is invalid", K(ret), K_(static_param)); } else if (FALSE_IT(batch_cg_count = end_cg_idx - start_cg_idx)) { - } else if (reuse_merge_info) { + } else if (reuse_merge_info && is_cg_merge_infos_valid(start_cg_idx, end_cg_idx, false/*check_info_ready*/)) { // reuse old merge info } else if (FALSE_IT(alloc_size = batch_cg_count * sizeof(ObTabletMergeInfo))) { } else if (OB_ISNULL(buf = mem_ctx_.local_alloc(alloc_size))) { @@ -241,6 +241,31 @@ int ObCOTabletMergeCtx::prepare_index_builder( return ret; } +bool ObCOTabletMergeCtx::is_cg_merge_infos_valid( + const uint32_t start_cg_idx, + const uint32_t end_cg_idx, + const bool check_info_ready) const +{ + bool bret = true; + + if (OB_NOT_NULL(cg_merge_info_array_)) { + ObTabletMergeInfo *info_ptr = nullptr; + for (int64_t i = start_cg_idx; bret && i < end_cg_idx; ++i) { + if (OB_ISNULL(info_ptr = cg_merge_info_array_[i])) { + bret = false; + } else if (!check_info_ready) { + // do nothing + } else if (nullptr == info_ptr->get_index_builder() + || !info_ptr->get_index_builder()->is_inited()) { + bret = false; + } + } + } else { + bret = false; + } + return bret; +} + int ObCOTabletMergeCtx::inner_loop_prepare_index_tree( const uint32_t start_cg_idx, const uint32_t end_cg_idx) diff --git a/src/storage/column_store/ob_co_merge_ctx.h b/src/storage/column_store/ob_co_merge_ctx.h index 79abfc0233..9674cd9388 100644 --- a/src/storage/column_store/ob_co_merge_ctx.h +++ b/src/storage/column_store/ob_co_merge_ctx.h @@ -101,6 +101,11 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx const share::ObDagId &dag_id, const ObCompactionTimeGuard &time_guard); int schedule_minor_errsim(bool &schedule_minor) const; + // only used for ObCOMergeBatchExeDag + bool is_cg_merge_infos_valid( + const uint32_t start_cg_idx, + const uint32_t end_cg_idx, + const bool check_info_ready) const; int inner_loop_prepare_index_tree( const uint32_t start_cg_idx, const uint32_t end_cg_idx); diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index 1a74269155..e63153ad6d 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -1,12 +1,15 @@ -//COpyright (c) 2022 OceanBase -// OceanBase is licensed under Mulan PubL v2. -// You can use this software according to the terms and conditions of the Mulan PubL v2. -// You may obtain a copy of Mulan PubL v2 at: -// http://license.coscl.org.cn/MulanPubL-2.0 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PubL v2 for more details. +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + #define USING_LOG_PREFIX STORAGE_COMPACTION #include "storage/column_store/ob_co_merge_dag.h" #include "storage/column_store/ob_column_oriented_merger.h" @@ -406,6 +409,7 @@ int ObCOMergeScheduleTask::process() */ ObCOMergeBatchExeDag::ObCOMergeBatchExeDag() : ObCOMergeDag(ObDagType::DAG_TYPE_CO_MERGE_BATCH_EXECUTE), + alloc_merge_info_lock_(), start_cg_idx_(0), end_cg_idx_(0), retry_create_task_(false), @@ -478,9 +482,6 @@ int ObCOMergeBatchExeDag::create_first_task() } else if (OB_UNLIKELY(!ctx->is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("co merge ctx is not ready", K(ret), KPC(ctx)); - } else if (FALSE_IT(SET_MEM_CTX(ctx->mem_ctx_))) { - } else if (OB_FAIL(ctx->prepare_index_builder(start_cg_idx_, end_cg_idx_, retry_create_task_))) { - STORAGE_LOG(WARN, "failed to prepare index builder ", K(ret), K(start_cg_idx_), K(end_cg_idx_)); } else if (OB_FAIL(create_task(nullptr/*parent*/, execute_task, 0/*task_idx*/, *ctx, *dag_net))) { LOG_WARN("fail to create merge task", K(ret), KPC(dag_net)); } else if (OB_ISNULL(execute_task)) { @@ -686,26 +687,12 @@ int ObCOMergeBatchExeTask::init( } else if (OB_UNLIKELY(idx < 0)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("idx is invalid", K(ret), K(idx)); - } else if (FALSE_IT(allocator_.bind_mem_ctx(ctx.mem_ctx_))) { } else { - ObCOMergeBatchExeDag *merge_dag = static_cast(dag_); - void *buf = nullptr; - if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to alloc memory for major merger", K(ret)); - } else { - merger_ = new (buf) ObCOMerger(allocator_, ctx.static_param_, - merge_dag->get_start_cg_idx(), merge_dag->get_end_cg_idx(), ctx.static_param_.is_rebuild_column_store_); - if (ctx.static_param_.is_rebuild_column_store_) { - FLOG_INFO("rebuild column store data", K(ret), K(ctx.get_tablet_id())); - } - } - if (OB_SUCC(ret)) { - idx_ = idx; - ctx_ = &ctx; - dag_net_ = &dag_net; - is_inited_ = true; - } + allocator_.bind_mem_ctx(ctx.mem_ctx_); + idx_ = idx; + ctx_ = &ctx; + dag_net_ = &dag_net; + is_inited_ = true; } return ret; } @@ -713,24 +700,51 @@ int ObCOMergeBatchExeTask::init( int ObCOMergeBatchExeTask::process() { int ret = OB_SUCCESS; + ObCOMergeBatchExeDag *exe_dag = static_cast(dag_); + if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("task is not inited", K(ret), K_(is_inited)); - } else if (OB_ISNULL(merger_)) { - ret = OB_ERR_SYS; - STORAGE_LOG(WARN, "Unexpected null partition merger", K(ret)); + } else if (OB_UNLIKELY(nullptr == ctx_ || nullptr == dag_net_ || nullptr != merger_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("get unexpected exe task", K(ret), K(ctx_), K(dag_net_), KPC(merger_)); + } else if (FALSE_IT(SET_MEM_CTX(ctx_->mem_ctx_))) { } else { + ObSpinLockGuard lock_guard(exe_dag->alloc_merge_info_lock_); + if (ctx_->is_cg_merge_infos_valid(exe_dag->get_start_cg_idx(), exe_dag->get_end_cg_idx(), true/*check info ready*/)) { + // do nothing + } else if (OB_FAIL(ctx_->prepare_index_builder(exe_dag->get_start_cg_idx(), + exe_dag->get_end_cg_idx(), + exe_dag->get_retry_create_task()))) { + STORAGE_LOG(WARN, "failed to prepare index builder ", K(ret), KPC(exe_dag)); + } + } + + void *buf = nullptr; + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "failed to alloc memory for major merger", K(ret)); + } else { + merger_ = new (buf) ObCOMerger(allocator_, + ctx_->static_param_, + exe_dag->get_start_cg_idx(), + exe_dag->get_end_cg_idx(), + ctx_->static_param_.is_rebuild_column_store_); + if (ctx_->static_param_.is_rebuild_column_store_) { + FLOG_INFO("rebuild column store data", K(ret), K(ctx_->get_tablet_id())); + } + merge_start(); if (OB_FAIL(merger_->merge_partition(*ctx_, idx_))) { STORAGE_LOG(WARN, "failed to merge partition", K(ret)); } else { FLOG_INFO("merge macro blocks ok", K(idx_), "task", *this, KPC(dag_)); } + if (nullptr != merger_) { + merger_->reset(); + } } - if (nullptr != merger_) { - merger_->reset(); - } - return ret; } @@ -1185,8 +1199,7 @@ int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag) } } // refine merge_batch_size_ with tenant memory - const bool enable_adaptive_merge_schedule = false; // TODO(@DanLing) impl tenant config - if (OB_SUCC(ret) && enable_adaptive_merge_schedule) { + if (OB_SUCC(ret) && MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) { try_update_merge_batch_size(co_merge_ctx_->array_count_); } diff --git a/src/storage/column_store/ob_co_merge_dag.h b/src/storage/column_store/ob_co_merge_dag.h index 8e1723dde9..873473247e 100644 --- a/src/storage/column_store/ob_co_merge_dag.h +++ b/src/storage/column_store/ob_co_merge_dag.h @@ -1,18 +1,22 @@ -//COpyright (c) 2022 OceanBase -// OceanBase is licensed under Mulan PubL v2. -// You can use this software according to the terms and conditions of the Mulan PubL v2. -// You may obtain a copy of Mulan PubL v2 at: -// http://license.coscl.org.cn/MulanPubL-2.0 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -// See the Mulan PubL v2 for more details. +/** + * Copyright (c) 2023 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + #ifndef OB_STORAGE_COLUMN_STORE_CO_MERGE_DAG_H_ #define OB_STORAGE_COLUMN_STORE_CO_MERGE_DAG_H_ #include "share/scheduler/ob_tenant_dag_scheduler.h" #include "storage/compaction/ob_tablet_merge_task.h" #include "storage/compaction/ob_partition_merger.h" #include "storage/column_store/ob_co_merge_ctx.h" +#include "lib/lock/ob_spin_lock.h" namespace oceanbase { @@ -138,6 +142,7 @@ public: virtual int diagnose_compaction_info(compaction::ObDiagnoseTabletCompProgress &progress) override; uint32_t get_start_cg_idx() const { return start_cg_idx_; } uint32_t get_end_cg_idx() const { return end_cg_idx_; } + bool get_retry_create_task() const { return retry_create_task_; } ObCompactionTimeGuard &get_time_guard() { return time_guard_; } OB_INLINE void dag_time_guard_click(const uint16_t event) { @@ -149,9 +154,11 @@ public: int create_sstable_after_merge(); INHERIT_TO_STRING_KV("ObTabletMergeDag", ObTabletMergeDag, K_(dag_net_id), K_(start_cg_idx), - K_(end_cg_idx)); + K_(end_cg_idx), K_(retry_create_task)); private: int prepare_merge_progress(); +public: + common::ObSpinLock alloc_merge_info_lock_; // alloc && check cg_merge_infos private: share::ObDagId dag_net_id_; uint32_t start_cg_idx_; @@ -173,7 +180,7 @@ public: protected: virtual int process() override; private: - virtual void merge_start(); + void merge_start(); private: bool is_inited_; int64_t idx_;