From 5358c920cf37599f99d93f7e15db7e64427b9cd6 Mon Sep 17 00:00:00 2001 From: Fengjingkun Date: Fri, 6 Dec 2024 08:49:20 +0000 Subject: [PATCH] opt the display of co merge history info --- src/objit/src/ob_llvm_di_helper.cpp | 6 +-- .../cs_encoding/ob_micro_block_cs_encoder.cpp | 2 +- .../blocksstable/ob_macro_block_writer.cpp | 1 - .../blocksstable/ob_macro_block_writer.h | 1 + src/storage/column_store/ob_co_merge_ctx.cpp | 29 +++++++++---- src/storage/column_store/ob_co_merge_dag.cpp | 3 +- src/storage/column_store/ob_co_merge_dag.h | 5 ++- .../compaction/ob_basic_tablet_merge_ctx.cpp | 13 ++++-- .../compaction/ob_basic_tablet_merge_ctx.h | 3 +- .../compaction/ob_medium_compaction_func.cpp | 8 ++-- .../ob_partition_merge_progress.cpp | 19 +++++++-- .../compaction/ob_partition_merger.cpp | 42 +++++++++++++++---- src/storage/compaction/ob_partition_merger.h | 14 +++---- .../ob_progressive_merge_helper.cpp | 2 +- 14 files changed, 104 insertions(+), 44 deletions(-) diff --git a/src/objit/src/ob_llvm_di_helper.cpp b/src/objit/src/ob_llvm_di_helper.cpp index 6ad76afe1..618203363 100644 --- a/src/objit/src/ob_llvm_di_helper.cpp +++ b/src/objit/src/ob_llvm_di_helper.cpp @@ -320,9 +320,9 @@ int ObLLVMDIHelper::create_struct_type( LOG_WARN("name or jc or file or scope is NULL", K(ret), K(name), K(jc_), K(file), K(scope), K(line)); } else { - SmallVector element_types; - for (int i = 0; OB_SUCC(ret) && i < member_types.count(); i++) { - if (OB_ISNULL(member_types.at(i).get_v())) { + SmallVector element_types; + for (int i = 0; OB_SUCC(ret) && i < member_types.count(); i++) { + if (OB_ISNULL(member_types.at(i).get_v())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("member type is NULL", K(ret), K(i), K(member_types.count())); } else { diff --git a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_encoder.cpp b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_encoder.cpp index ca2a7c3be..8f8475760 100644 --- a/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_encoder.cpp +++ b/src/storage/blocksstable/cs_encoding/ob_micro_block_cs_encoder.cpp @@ -441,7 +441,7 @@ int ObMicroBlockCSEncoder::append_row(const ObDatumRow &row) if (OB_UNLIKELY(appended_row_count_ >= ObCSEncodingUtil::MAX_MICRO_BLOCK_ROW_CNT || appended_row_count_ > ctx_.encoding_granularity_)) { ret = OB_BUF_NOT_ENOUGH; - LOG_INFO("Try to encode more rows than maximum of row cnt in header, force to build a block", + LOG_DEBUG("Try to encode more rows than maximum of row cnt in header, force to build a block", K_(appended_row_count), K(row), K(ctx_.encoding_granularity_)); } else if (OB_FAIL(copy_and_append_row_(row, store_size))) { if (OB_UNLIKELY(OB_BUF_NOT_ENOUGH != ret)) { diff --git a/src/storage/blocksstable/ob_macro_block_writer.cpp b/src/storage/blocksstable/ob_macro_block_writer.cpp index 176309d74..df5307990 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.cpp +++ b/src/storage/blocksstable/ob_macro_block_writer.cpp @@ -609,7 +609,6 @@ int ObMacroBlockWriter::append_row(const ObDatumRow &row, const ObMacroBlockDesc } else if (OB_FAIL(try_active_flush_macro_block())) { STORAGE_LOG(WARN, "Fail to try_active_flush_macro_block", K(ret)); } else { - ++merge_block_info_.incremental_row_count_; STORAGE_LOG(DEBUG, "Success to append row, ", "tablet_id", data_store_desc_->get_tablet_id(), K(row)); } return ret; diff --git a/src/storage/blocksstable/ob_macro_block_writer.h b/src/storage/blocksstable/ob_macro_block_writer.h index a30388d07..7a6db2fce 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.h +++ b/src/storage/blocksstable/ob_macro_block_writer.h @@ -192,6 +192,7 @@ public: const int64_t verify_level = MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION); inline int64_t get_macro_data_size() const { return macro_blocks_[current_index_].get_data_size() + micro_writer_->get_block_size(); } const compaction::ObMergeBlockInfo& get_merge_block_info() const { return merge_block_info_; } + void inc_incremental_row_count() { ++merge_block_info_.incremental_row_count_; } protected: virtual int build_micro_block(); virtual int try_switch_macro_block(); diff --git a/src/storage/column_store/ob_co_merge_ctx.cpp b/src/storage/column_store/ob_co_merge_ctx.cpp index ad999e7f6..85c128429 100644 --- a/src/storage/column_store/ob_co_merge_ctx.cpp +++ b/src/storage/column_store/ob_co_merge_ctx.cpp @@ -376,14 +376,25 @@ int ObCOTabletMergeCtx::cal_merge_param() int ObCOTabletMergeCtx::collect_running_info() { int ret = OB_SUCCESS; - dag_net_merge_history_.static_info_.shallow_copy(static_history_); - dag_net_merge_history_.static_info_.is_fake_ = true; - dag_net_merge_history_.running_info_.merge_start_time_ = dag_net_.get_start_time(); - // add a fake merge info into history with only dag_net time_guard - (void) ObBasicTabletMergeCtx::add_sstable_merge_info(dag_net_merge_history_, dag_net_.get_dag_id(), dag_net_.hash(), - info_collector_.time_guard_, nullptr/*sstable*/, - &static_param_.snapshot_info_, - 0/*start_cg_idx*/, array_count_/*end_cg_idx*/); + const int64_t batch_exe_dag_cnt = dag_net_.get_batch_dag_count(); + + if (is_build_row_store() || 1 >= batch_exe_dag_cnt) { + // no need to report dag net merge history + } else { + dag_net_merge_history_.static_info_.shallow_copy(static_history_); + dag_net_merge_history_.static_info_.is_fake_ = true; + dag_net_merge_history_.running_info_.merge_start_time_ = dag_net_.get_start_time(); + // add a fake merge info into history with only dag_net time_guard + (void) ObBasicTabletMergeCtx::add_sstable_merge_info(dag_net_merge_history_, + dag_net_.get_dag_id(), + dag_net_.hash(), + info_collector_.time_guard_, + nullptr/*sstable*/, + &static_param_.snapshot_info_, + 0/*start_cg_idx*/, + array_count_/*end_cg_idx*/, + batch_exe_dag_cnt); + } return ret; } @@ -428,7 +439,7 @@ int ObCOTabletMergeCtx::collect_running_info( LOG_WARN("failed to collect running info in batch"); } else { const ObSSTable *new_sstable = static_cast(new_table); - add_sstable_merge_info(cg_merge_info_array_[start_cg_idx]->get_merge_history(), + ObBasicTabletMergeCtx::add_sstable_merge_info(cg_merge_info_array_[start_cg_idx]->get_merge_history(), dag_id, hash, time_guard, new_sstable, &static_param_.snapshot_info_, start_cg_idx, end_cg_idx); } return ret; diff --git a/src/storage/column_store/ob_co_merge_dag.cpp b/src/storage/column_store/ob_co_merge_dag.cpp index 7082036bc..e2738e0ec 100644 --- a/src/storage/column_store/ob_co_merge_dag.cpp +++ b/src/storage/column_store/ob_co_merge_dag.cpp @@ -500,7 +500,6 @@ int ObCOMergeBatchExeDag::init_by_param(const share::ObIDagInitParam *param) int ObCOMergeBatchExeDag::create_first_task() { int ret = OB_SUCCESS; - int tmp_ret = OB_SUCCESS; ObCOMergeBatchExeTask *execute_task = nullptr; ObCOMergeBatchFinishTask *finish_task = nullptr; ObCOMergeDagNet *dag_net = static_cast(get_dag_net()); @@ -941,6 +940,7 @@ int ObCOMergeBatchFinishTask::process() } else if (OB_FAIL(execute_dag->create_sstable_after_merge())) { LOG_WARN("failed to create sstable after merge", K(ret), KPC(execute_dag)); } else { + dag_net_->inc_batch_dag_count(); FLOG_INFO("co batch sstable merge finish", K(ret), "start_cg sstable_merge_info", ctx_->cg_merge_info_array_[execute_dag->get_start_cg_idx()]->get_merge_history(), "time_guard", execute_dag->get_time_guard(), @@ -1089,6 +1089,7 @@ ObCOMergeDagNet::ObCOMergeDagNet() batch_reduced_(false), ctx_lock_(), merge_batch_size_(ObCOTabletMergeCtx::DEFAULT_CG_MERGE_BATCH_SIZE), + batch_dag_cnt_(0), merge_status_(COMergeStatus::NOT_INIT), basic_param_(), tmp_allocator_("CoDagNet", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID(), ObCtxIds::MERGE_NORMAL_CTX_ID), diff --git a/src/storage/column_store/ob_co_merge_dag.h b/src/storage/column_store/ob_co_merge_dag.h index dba49ab09..dbcba01d2 100644 --- a/src/storage/column_store/ob_co_merge_dag.h +++ b/src/storage/column_store/ob_co_merge_dag.h @@ -294,6 +294,8 @@ public: int swap_tablet_after_minor(); ObCOTabletMergeCtx *get_merge_ctx() const { return co_merge_ctx_; } const ObCOMergeDagParam& get_dag_param() const { return basic_param_; } + int64_t get_batch_dag_count() const { return ATOMIC_LOAD(&batch_dag_cnt_); } + void inc_batch_dag_count() { ATOMIC_INC(&batch_dag_cnt_); } void collect_running_info(const uint32_t start_cg_idx, const uint32_t end_cg_idx, const int64_t hash, const share::ObDagId &dag_id, const ObCompactionTimeGuard &time_guard); template @@ -304,7 +306,7 @@ public: share::ObIDag *parent = nullptr, const bool add_scheduler_flag = true); INHERIT_TO_STRING_KV("ObIDagNet", ObIDagNet, K_(is_inited), K_(merge_status), K_(finish_added), - K_(merge_batch_size), K_(basic_param), KP_(finish_dag)); + K_(merge_batch_size), K_(batch_dag_cnt), K_(basic_param), KP_(finish_dag)); private: static const int64_t DELAY_SCHEDULE_FINISH_DAG_CG_CNT = 150; static const int64_t DEFAULT_MAX_RETRY_TIMES = 2; @@ -347,6 +349,7 @@ private: bool batch_reduced_; // only reduce batch_size one time in a round // locked by ctx_lock_ lib::ObMutex ctx_lock_; int64_t merge_batch_size_; // will decrease when meet memory allocate failed + int64_t batch_dag_cnt_; // record the batch exec dag cnt COMergeStatus merge_status_; ObCOMergeDagParam basic_param_; common::ObArenaAllocator tmp_allocator_; // TODO(@lixia.yq) temp solution, use allocator on ObIDagNet later diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index bcf1588b2..6999d4a0e 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -769,7 +769,8 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info( const ObSSTable *sstable, const ObStorageSnapshotInfo *snapshot_info, const int64_t start_cg_idx, - const int64_t end_cg_idx) + const int64_t end_cg_idx, + const int64_t batch_exec_dag_cnt) { int tmp_ret = OB_SUCCESS; ObDagWarningInfo warning_info; @@ -802,6 +803,9 @@ void ObBasicTabletMergeCtx::add_sstable_merge_info( running_info.io_percentage_ = io_percentage; } } + if (batch_exec_dag_cnt > 0) { + ADD_COMMENT("CO_DAG_NET batch_cnt", batch_exec_dag_cnt); + } if (running_info.execute_time_ > 30_s && (get_concurrent_cnt() > 1 || end_cg_idx > 0)) { ADD_COMMENT("execute_time", running_info.execute_time_); } @@ -1332,6 +1336,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info() int64_t schema_version = 0; ObStorageSchema *storage_schema = nullptr; bool is_building_index = false; // placeholder + uint64_t min_data_version = 0; if (OB_UNLIKELY(!is_meta_major_merge(get_merge_type()) || nullptr != static_param_.schema_)) { @@ -1344,10 +1349,12 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info() LOG_WARN("failed to get schema service from MTL", K(ret)); } else if (OB_FAIL(tablet->get_schema_version_from_storage_schema(schema_version))){ LOG_WARN("failed to get schema version from tablet", KR(ret), KPC(tablet)); + } else if (OB_FAIL(GET_MIN_DATA_VERSION(MTL_ID(), min_data_version))) { + LOG_WARN("failed to get min data version", K(ret)); } else if (OB_FAIL(ObMediumCompactionScheduleFunc::get_table_schema_to_merge(*schema_service, *tablet, schema_version, - DATA_CURRENT_VERSION, + min_data_version, mem_ctx_.get_allocator(), *storage_schema, is_building_index))) { @@ -1366,7 +1373,7 @@ int ObBasicTabletMergeCtx::get_meta_compaction_info() } if (OB_SUCC(ret)) { - static_param_.data_version_ = DATA_CURRENT_VERSION; + static_param_.data_version_ = min_data_version; static_param_.is_rebuild_column_store_ = false; static_param_.is_schema_changed_ = true; // use MACRO_BLOCK_MERGE_LEVEL static_param_.merge_reason_ = ObAdaptiveMergePolicy::TOMBSTONE_SCENE; diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.h b/src/storage/compaction/ob_basic_tablet_merge_ctx.h index 6f4c545fb..dbc216b50 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.h +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.h @@ -204,7 +204,8 @@ public: const blocksstable::ObSSTable *sstable = nullptr, const ObStorageSnapshotInfo *snapshot_info = nullptr, const int64_t start_cg_idx = 0, - const int64_t end_cg_idx = 0); + const int64_t end_cg_idx = 0, + const int64_t batch_exec_dag_cnt = 0); int generate_participant_table_info(PartTableInfo &info) const; int generate_macro_id_list(char *buf, const int64_t buf_len, const blocksstable::ObSSTable *&sstable) const; /* GET FUNC */ diff --git a/src/storage/compaction/ob_medium_compaction_func.cpp b/src/storage/compaction/ob_medium_compaction_func.cpp index ba6c907da..8f4c7f25a 100644 --- a/src/storage/compaction/ob_medium_compaction_func.cpp +++ b/src/storage/compaction/ob_medium_compaction_func.cpp @@ -742,10 +742,10 @@ int ObMediumCompactionScheduleFunc::check_if_schema_changed( || (ObRowStoreType::DUMMY_ROW_STORE != tablet.get_last_major_latest_row_store_type() && tablet.get_last_major_latest_row_store_type() != schema.row_store_type_)) { is_schema_changed = true; - LOG_INFO("schema changed", K(schema), K(schema), K(full_stored_col_cnt), - "col_cnt_in_sstable", tablet.get_last_major_column_count(), - "compressor_type_in_sstable", tablet.get_last_major_compressor_type(), - "latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type()); + LOG_INFO("schema changed", K(schema), K(full_stored_col_cnt), + "col_cnt_in_sstable", tablet.get_last_major_column_count(), + "compressor_type_in_sstable", tablet.get_last_major_compressor_type(), + "latest_row_store_type_in_sstable", tablet.get_last_major_latest_row_store_type()); } else { is_schema_changed = false; } diff --git a/src/storage/compaction/ob_partition_merge_progress.cpp b/src/storage/compaction/ob_partition_merge_progress.cpp index 027837ce8..414c4db83 100644 --- a/src/storage/compaction/ob_partition_merge_progress.cpp +++ b/src/storage/compaction/ob_partition_merge_progress.cpp @@ -431,10 +431,23 @@ int ObCOMajorMergeProgress::finish_merge_progress() } else if (OB_UNLIKELY(OB_ISNULL(merge_dag_) || typeid(*merge_dag_) != typeid(ObCOMergeBatchExeDag))) { ret = OB_ERR_UNEXPECTED; LOG_WARN("merge_dag has unexpected type", K(ret), KPC_(merge_dag)); - } else if (OB_UNLIKELY(OB_ISNULL(ctx_) || typeid(*ctx_) != typeid(ObCOTabletMergeCtx))) { + } else if (OB_ISNULL(ctx_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx)); - } else { + LOG_WARN("get unexpected null ctx", K(ret), KPC_(ctx)); + } else if (typeid(*ctx_) != typeid(ObCOTabletMergeCtx)) { + if (!GCTX.is_shared_storage_mode()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx)); +#ifdef OB_BUILD_SHARED_STORAGE + } else if (typeid(*ctx_) != typeid(ObCOTabletOutputMergeCtx) + && typeid(*ctx_) != typeid(ObCOTabletValidateMergeCtx)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx has unexpected type", K(ret), KPC_(ctx)); +#endif + } + } + + if (OB_SUCC(ret)) { ObCOMergeBatchExeDag *merge_dag = static_cast(merge_dag_); ObCOTabletMergeCtx *ctx = static_cast(ctx_); if (OB_FAIL(finish_progress(ctx->get_merge_version(), diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index bcc1863bd..63ed31e13 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -356,7 +356,9 @@ int ObPartitionMerger::process(const ObMicroBlock µ_block) return ret; } -int ObPartitionMerger::process(const ObDatumRow &row) +int ObPartitionMerger::process( + const ObDatumRow &row, + bool is_incremental_row) { int ret = OB_SUCCESS; ObICompactionFilter::ObFilterRet filter_ret = ObICompactionFilter::FILTER_RET_MAX; @@ -385,7 +387,7 @@ int ObPartitionMerger::process(const ObDatumRow &row) // drop this row } else if (OB_FAIL(check_row_columns(row))) { STORAGE_LOG(WARN, "Failed to check row columns", K(ret), K(row)); - } else if (OB_FAIL(inner_process(row))) { + } else if (OB_FAIL(inner_process(row, is_incremental_row))) { STORAGE_LOG(WARN, "Failed to inner append row", K(ret)); } else { LOG_DEBUG("append row", K(ret), K(row)); @@ -531,7 +533,9 @@ void write_wrong_row(const ObTabletID &tablet_id, const ObDatumRow &row) } #endif -int ObPartitionMajorMerger::inner_process(const ObDatumRow &row) +int ObPartitionMajorMerger::inner_process( + const ObDatumRow &row, + bool is_incremental_row) { int ret = OB_SUCCESS; const bool is_delete = row.row_flag_.is_delete(); @@ -548,6 +552,8 @@ int ObPartitionMajorMerger::inner_process(const ObDatumRow &row) STORAGE_LOG(WARN, "Failed to get base iter macro", K(ret)); } else if (OB_FAIL(macro_writer_->append_row(row, macro_desc))) { STORAGE_LOG(WARN, "Failed to append row to macro writer", K(ret)); + } else if (is_incremental_row) { + macro_writer_->inc_incremental_row_count(); } } @@ -624,7 +630,7 @@ int ObPartitionMajorMerger::merge_partition( STORAGE_LOG(WARN, "cur row is null, but block opened", K(ret), KPC(iter)); } } else if (OB_FAIL(merge_same_rowkey_iters(minimum_iters_))) { - STORAGE_LOG(WARN, "failed to merge_same_rowkey_iters", K(ret), K(minimum_iters_)); + STORAGE_LOG(WARN, "failed to merge same rowkey iters", K(ret), K(minimum_iters_)); } if (OB_FAIL(ret)) { @@ -681,12 +687,22 @@ int ObPartitionMajorMerger::init_progressive_merge_helper() return ret; } -int ObPartitionMajorMerger::merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) +int ObPartitionMajorMerger::merge_same_rowkey_iters( + MERGE_ITER_ARRAY &merge_iters, + bool is_incremental_row) { int ret = OB_SUCCESS; + + if (is_incremental_row && + 1 == merge_iters.count() && + OB_NOT_NULL(merge_iters.at(0)) && + merge_iters.at(0)->is_major_sstable_iter()) { + is_incremental_row = false; + } + if (OB_FAIL(partition_fuser_->fuse_row(merge_iters))) { STORAGE_LOG(WARN, "Failed to fuse row", KPC_(partition_fuser), K(ret)); - } else if (OB_FAIL(process(partition_fuser_->get_result_row()))) { + } else if (OB_FAIL(process(partition_fuser_->get_result_row(), is_incremental_row))) { STORAGE_LOG(WARN, "Failed to process row", K(ret), K(partition_fuser_->get_result_row())); } else if (OB_FAIL(merge_helper_->move_iters_next(merge_iters))) { STORAGE_LOG(WARN, "failed to move iters", K(ret), K(merge_iters)); @@ -743,7 +759,7 @@ int ObPartitionMajorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) curr_macro_id = curr_macro->macro_block_id_; // TODO maybe we need use macro_block_ctx to decide whether the result row came from the same macro block while (OB_SUCC(ret) && !iter->is_iter_end() && iter->is_macro_block_opened()) { - if (OB_FAIL(merge_same_rowkey_iters(minimum_iters))) { + if (OB_FAIL(merge_same_rowkey_iters(minimum_iters, false))) { STORAGE_LOG(WARN, "failed to merge_same_rowkey_iters", K(ret), K(minimum_iters)); } else if (OB_FAIL(iter->get_curr_macro_block(tmp_macro))) { STORAGE_LOG(WARN, "failed to get curr macro block", K(ret), KPC(tmp_macro)); @@ -893,15 +909,20 @@ int ObPartitionMinorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) return ret; } -int ObPartitionMinorMerger::inner_process(const ObDatumRow &row) +int ObPartitionMinorMerger::inner_process( + const ObDatumRow &row, + bool is_incremental_row) { int ret = OB_SUCCESS; + UNUSED(is_incremental_row); + const blocksstable::ObMacroBlockDesc *macro_desc; if (OB_FAIL(get_base_iter_curr_macro_block(macro_desc))) { STORAGE_LOG(WARN, "Failed to get base iter macro", K(ret)); } else if (OB_FAIL(macro_writer_->append_row(row, macro_desc))) { STORAGE_LOG(WARN, "Failed to append row to macro writer", K(ret)); } else { + macro_writer_->inc_incremental_row_count(); STORAGE_LOG(DEBUG, "Success to append row to minor macro writer", K(ret), K(row)); } @@ -1250,9 +1271,12 @@ int ObPartitionMinorMerger::try_remove_ghost_iters(MERGE_ITER_ARRAY &merge_iters return ret; } -int ObPartitionMinorMerger::merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) +int ObPartitionMinorMerger::merge_same_rowkey_iters( + MERGE_ITER_ARRAY &merge_iters, + bool is_incremental_row) { int ret = OB_SUCCESS; + UNUSED(is_incremental_row); if (OB_UNLIKELY(merge_iters.empty())) { ret = OB_INVALID_ARGUMENT; diff --git a/src/storage/compaction/ob_partition_merger.h b/src/storage/compaction/ob_partition_merger.h index 03e1197c1..ba0dee4cc 100644 --- a/src/storage/compaction/ob_partition_merger.h +++ b/src/storage/compaction/ob_partition_merger.h @@ -132,17 +132,17 @@ public: INHERIT_TO_STRING_KV("ObPartitionMerger", ObMerger, KPC_(merge_progress), K_(data_store_desc), K_(minimum_iters), KP_(validator)); protected: - virtual int inner_process(const blocksstable::ObDatumRow &row) = 0; + virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) = 0; virtual int close() override; virtual int process(const blocksstable::ObMicroBlock µ_block); virtual int process( const blocksstable::ObMacroBlockDesc ¯o_meta, const ObMicroBlockData *micro_block_data); - virtual int process(const blocksstable::ObDatumRow &row); + virtual int process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true); virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) = 0; virtual int merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, int64_t &reuse_row_cnt); virtual int check_macro_block_op(const ObMacroBlockDesc ¯o_desc, ObMacroBlockOp &block_op); - virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) = 0; + virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) = 0; int check_row_columns(const blocksstable::ObDatumRow &row); int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret); @@ -174,12 +174,12 @@ public: const int64_t idx) override; INHERIT_TO_STRING_KV("ObPartitionMajorMerger", ObPartitionMerger, "curr merger", "major merger"); protected: - virtual int inner_process(const blocksstable::ObDatumRow &row) override; + virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) override; private: virtual int inner_init() override; int init_progressive_merge_helper(); virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) override; - virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override; + virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) override; int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt); int reuse_base_sstable(ObPartitionMergeHelper &merge_helper); }; @@ -197,11 +197,11 @@ public: const int64_t idx) override; INHERIT_TO_STRING_KV("ObPartitionMinorMerger", ObPartitionMerger, K_(minimum_iter_idxs)); protected: - virtual int inner_process(const blocksstable::ObDatumRow &row) override; + virtual int inner_process(const blocksstable::ObDatumRow &row, bool is_incremental_row = true) override; int find_minimum_iters_with_same_rowkey(MERGE_ITER_ARRAY &merge_iters, MERGE_ITER_ARRAY &minimum_iters, common::ObIArray &iter_idxs); - virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override; + virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters, bool is_incremental_row = true) override; int try_remove_ghost_iters(MERGE_ITER_ARRAY &merge_iters, const bool shadow_already_output, MERGE_ITER_ARRAY &minimum_iters, diff --git a/src/storage/compaction/ob_progressive_merge_helper.cpp b/src/storage/compaction/ob_progressive_merge_helper.cpp index a8a54f7d5..30502d34c 100644 --- a/src/storage/compaction/ob_progressive_merge_helper.cpp +++ b/src/storage/compaction/ob_progressive_merge_helper.cpp @@ -173,7 +173,7 @@ int ObProgressiveMergeHelper::init( && sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) { check_macro_need_merge_ = true; } - LOG_INFO("finish macro block need merge check", "tablet_id", static_param.get_tablet_id(), K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_)); + FLOG_INFO("finish macro block need merge check", "tablet_id", static_param.get_tablet_id(), K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_)); } }