optimize cg merge info memory alloc

This commit is contained in:
Fengjingkun
2023-11-06 05:39:31 +00:00
committed by ob-robot
parent 6bcea4de73
commit 5d11cc123e
5 changed files with 103 additions and 52 deletions

View File

@ -496,6 +496,7 @@ public:
const int64_t nested_size = OB_DEFAULT_MACRO_BLOCK_SIZE,
const int64_t nested_offset = 0);
int init_meta_iter(ObMacroMetaIter &iter);
bool is_inited() const { return is_inited_; }
TO_STRING_KV(K(roots_.count()));
public:
static bool check_version_for_small_sstable(const ObDataStoreDesc &index_desc);

View File

@ -211,7 +211,7 @@ int ObCOTabletMergeCtx::prepare_index_builder(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("schema is invalid", K(ret), K_(static_param));
} else if (FALSE_IT(batch_cg_count = end_cg_idx - start_cg_idx)) {
} else if (reuse_merge_info) {
} else if (reuse_merge_info && is_cg_merge_infos_valid(start_cg_idx, end_cg_idx, false/*check_info_ready*/)) {
// reuse old merge info
} else if (FALSE_IT(alloc_size = batch_cg_count * sizeof(ObTabletMergeInfo))) {
} else if (OB_ISNULL(buf = mem_ctx_.local_alloc(alloc_size))) {
@ -241,6 +241,31 @@ int ObCOTabletMergeCtx::prepare_index_builder(
return ret;
}
bool ObCOTabletMergeCtx::is_cg_merge_infos_valid(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx,
const bool check_info_ready) const
{
bool bret = true;
if (OB_NOT_NULL(cg_merge_info_array_)) {
ObTabletMergeInfo *info_ptr = nullptr;
for (int64_t i = start_cg_idx; bret && i < end_cg_idx; ++i) {
if (OB_ISNULL(info_ptr = cg_merge_info_array_[i])) {
bret = false;
} else if (!check_info_ready) {
// do nothing
} else if (nullptr == info_ptr->get_index_builder()
|| !info_ptr->get_index_builder()->is_inited()) {
bret = false;
}
}
} else {
bret = false;
}
return bret;
}
int ObCOTabletMergeCtx::inner_loop_prepare_index_tree(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx)

View File

@ -101,6 +101,11 @@ struct ObCOTabletMergeCtx : public ObBasicTabletMergeCtx
const share::ObDagId &dag_id,
const ObCompactionTimeGuard &time_guard);
int schedule_minor_errsim(bool &schedule_minor) const;
// only used for ObCOMergeBatchExeDag
bool is_cg_merge_infos_valid(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx,
const bool check_info_ready) const;
int inner_loop_prepare_index_tree(
const uint32_t start_cg_idx,
const uint32_t end_cg_idx);

View File

@ -1,12 +1,15 @@
//COpyright (c) 2022 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX STORAGE_COMPACTION
#include "storage/column_store/ob_co_merge_dag.h"
#include "storage/column_store/ob_column_oriented_merger.h"
@ -406,6 +409,7 @@ int ObCOMergeScheduleTask::process()
*/
ObCOMergeBatchExeDag::ObCOMergeBatchExeDag()
: ObCOMergeDag(ObDagType::DAG_TYPE_CO_MERGE_BATCH_EXECUTE),
alloc_merge_info_lock_(),
start_cg_idx_(0),
end_cg_idx_(0),
retry_create_task_(false),
@ -478,9 +482,6 @@ int ObCOMergeBatchExeDag::create_first_task()
} else if (OB_UNLIKELY(!ctx->is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("co merge ctx is not ready", K(ret), KPC(ctx));
} else if (FALSE_IT(SET_MEM_CTX(ctx->mem_ctx_))) {
} else if (OB_FAIL(ctx->prepare_index_builder(start_cg_idx_, end_cg_idx_, retry_create_task_))) {
STORAGE_LOG(WARN, "failed to prepare index builder ", K(ret), K(start_cg_idx_), K(end_cg_idx_));
} else if (OB_FAIL(create_task(nullptr/*parent*/, execute_task, 0/*task_idx*/, *ctx, *dag_net))) {
LOG_WARN("fail to create merge task", K(ret), KPC(dag_net));
} else if (OB_ISNULL(execute_task)) {
@ -686,26 +687,12 @@ int ObCOMergeBatchExeTask::init(
} else if (OB_UNLIKELY(idx < 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("idx is invalid", K(ret), K(idx));
} else if (FALSE_IT(allocator_.bind_mem_ctx(ctx.mem_ctx_))) {
} else {
ObCOMergeBatchExeDag *merge_dag = static_cast<ObCOMergeBatchExeDag*>(dag_);
void *buf = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc memory for major merger", K(ret));
} else {
merger_ = new (buf) ObCOMerger(allocator_, ctx.static_param_,
merge_dag->get_start_cg_idx(), merge_dag->get_end_cg_idx(), ctx.static_param_.is_rebuild_column_store_);
if (ctx.static_param_.is_rebuild_column_store_) {
FLOG_INFO("rebuild column store data", K(ret), K(ctx.get_tablet_id()));
}
}
if (OB_SUCC(ret)) {
idx_ = idx;
ctx_ = &ctx;
dag_net_ = &dag_net;
is_inited_ = true;
}
allocator_.bind_mem_ctx(ctx.mem_ctx_);
idx_ = idx;
ctx_ = &ctx;
dag_net_ = &dag_net;
is_inited_ = true;
}
return ret;
}
@ -713,24 +700,51 @@ int ObCOMergeBatchExeTask::init(
int ObCOMergeBatchExeTask::process()
{
int ret = OB_SUCCESS;
ObCOMergeBatchExeDag *exe_dag = static_cast<ObCOMergeBatchExeDag*>(dag_);
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("task is not inited", K(ret), K_(is_inited));
} else if (OB_ISNULL(merger_)) {
ret = OB_ERR_SYS;
STORAGE_LOG(WARN, "Unexpected null partition merger", K(ret));
} else if (OB_UNLIKELY(nullptr == ctx_ || nullptr == dag_net_ || nullptr != merger_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected exe task", K(ret), K(ctx_), K(dag_net_), KPC(merger_));
} else if (FALSE_IT(SET_MEM_CTX(ctx_->mem_ctx_))) {
} else {
ObSpinLockGuard lock_guard(exe_dag->alloc_merge_info_lock_);
if (ctx_->is_cg_merge_infos_valid(exe_dag->get_start_cg_idx(), exe_dag->get_end_cg_idx(), true/*check info ready*/)) {
// do nothing
} else if (OB_FAIL(ctx_->prepare_index_builder(exe_dag->get_start_cg_idx(),
exe_dag->get_end_cg_idx(),
exe_dag->get_retry_create_task()))) {
STORAGE_LOG(WARN, "failed to prepare index builder ", K(ret), KPC(exe_dag));
}
}
void *buf = nullptr;
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObCOMerger)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc memory for major merger", K(ret));
} else {
merger_ = new (buf) ObCOMerger(allocator_,
ctx_->static_param_,
exe_dag->get_start_cg_idx(),
exe_dag->get_end_cg_idx(),
ctx_->static_param_.is_rebuild_column_store_);
if (ctx_->static_param_.is_rebuild_column_store_) {
FLOG_INFO("rebuild column store data", K(ret), K(ctx_->get_tablet_id()));
}
merge_start();
if (OB_FAIL(merger_->merge_partition(*ctx_, idx_))) {
STORAGE_LOG(WARN, "failed to merge partition", K(ret));
} else {
FLOG_INFO("merge macro blocks ok", K(idx_), "task", *this, KPC(dag_));
}
if (nullptr != merger_) {
merger_->reset();
}
}
if (nullptr != merger_) {
merger_->reset();
}
return ret;
}
@ -1185,8 +1199,7 @@ int ObCOMergeDagNet::inner_create_and_schedule_dags(ObIDag *parent_dag)
}
}
// refine merge_batch_size_ with tenant memory
const bool enable_adaptive_merge_schedule = false; // TODO(@DanLing) impl tenant config
if (OB_SUCC(ret) && enable_adaptive_merge_schedule) {
if (OB_SUCC(ret) && MTL(ObTenantTabletScheduler *)->enable_adaptive_merge_schedule()) {
try_update_merge_batch_size(co_merge_ctx_->array_count_);
}

View File

@ -1,18 +1,22 @@
//COpyright (c) 2022 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OB_STORAGE_COLUMN_STORE_CO_MERGE_DAG_H_
#define OB_STORAGE_COLUMN_STORE_CO_MERGE_DAG_H_
#include "share/scheduler/ob_tenant_dag_scheduler.h"
#include "storage/compaction/ob_tablet_merge_task.h"
#include "storage/compaction/ob_partition_merger.h"
#include "storage/column_store/ob_co_merge_ctx.h"
#include "lib/lock/ob_spin_lock.h"
namespace oceanbase
{
@ -138,6 +142,7 @@ public:
virtual int diagnose_compaction_info(compaction::ObDiagnoseTabletCompProgress &progress) override;
uint32_t get_start_cg_idx() const { return start_cg_idx_; }
uint32_t get_end_cg_idx() const { return end_cg_idx_; }
bool get_retry_create_task() const { return retry_create_task_; }
ObCompactionTimeGuard &get_time_guard() { return time_guard_; }
OB_INLINE void dag_time_guard_click(const uint16_t event)
{
@ -149,9 +154,11 @@ public:
int create_sstable_after_merge();
INHERIT_TO_STRING_KV("ObTabletMergeDag", ObTabletMergeDag, K_(dag_net_id), K_(start_cg_idx),
K_(end_cg_idx));
K_(end_cg_idx), K_(retry_create_task));
private:
int prepare_merge_progress();
public:
common::ObSpinLock alloc_merge_info_lock_; // alloc && check cg_merge_infos
private:
share::ObDagId dag_net_id_;
uint32_t start_cg_idx_;
@ -173,7 +180,7 @@ public:
protected:
virtual int process() override;
private:
virtual void merge_start();
void merge_start();
private:
bool is_inited_;
int64_t idx_;