diff --git a/src/storage/column_store/ob_co_merge_writer.cpp b/src/storage/column_store/ob_co_merge_writer.cpp index 8de9982cf4..534edff523 100644 --- a/src/storage/column_store/ob_co_merge_writer.cpp +++ b/src/storage/column_store/ob_co_merge_writer.cpp @@ -348,7 +348,6 @@ int ObCOMergeWriter::append_iter_curr_row_or_range() } } else { const ObMacroBlockDesc *macro_desc = nullptr; - bool need_rewrite = false; if (OB_FAIL(iter_->get_curr_macro_block(macro_desc))) { STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter_)); @@ -394,7 +393,7 @@ int ObCOMergeWriter::compare(const ObMergeLog &mergelog, int64_t &cmp_ret, const skip_curr_row = true; break; } else if (FALSE_IT(check_iter_range = true)) { - } else if (OB_FAIL(iter_->open_curr_range(false))) { + } else if (OB_FAIL(iter_->open_curr_range(false /* rewrite */))) { STORAGE_LOG(WARN, "failed to open curr range", K(ret), KPC(iter_)); } } @@ -444,7 +443,7 @@ int ObCOMergeWriter::process_macro_rewrite() if (OB_UNLIKELY(iter_->is_macro_block_opened())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "unexpected macro block opened", K(ret), KPC(iter_)); - } else if (OB_FAIL(iter_->open_curr_range(true))) { + } else if (OB_FAIL(iter_->open_curr_range(true /* rewrite */))) { STORAGE_LOG(WARN, "failed to open iter range", K(ret), KPC(iter_)); } else if (OB_ISNULL(iter_->get_curr_row())) { ret = OB_ERR_UNEXPECTED; @@ -539,23 +538,25 @@ int ObCOMergeRowWriter::init(const blocksstable::ObDatumRow &default_row, int ObCOMergeRowWriter::process(const ObMacroBlockDesc ¯o_desc) { int ret = OB_SUCCESS; - bool need_rewrite = false; - if (OB_NOT_NULL(progressive_merge_helper_)) { - if (OB_FAIL(progressive_merge_helper_->need_rewrite_macro_block(macro_desc, need_rewrite))) { - STORAGE_LOG(WARN, "failed to check need_rewrite_macro_block", K(ret), K(macro_desc)); - } else if (need_rewrite) { - progressive_merge_helper_->inc_rewrite_block_cnt(); - } else if (progressive_merge_helper_->need_check_macro_merge() - && OB_FAIL(write_helper_.check_data_macro_block_need_merge(macro_desc, need_rewrite))) { - STORAGE_LOG(WARN, "Failed to check data macro block need merge", K(ret), K(macro_desc)); + ObMacroBlockOp block_op; + if (OB_NOT_NULL(progressive_merge_helper_) && progressive_merge_helper_->is_valid()) { + if (OB_FAIL(progressive_merge_helper_->check_macro_block_op(macro_desc, block_op))) { + STORAGE_LOG(WARN, "failed to check macro operation", K(ret), K(macro_desc)); } } if (OB_FAIL(ret)) { - } else if (need_rewrite) { + } else if (block_op.is_rewrite()) { + progressive_merge_helper_->inc_rewrite_block_cnt(); if (OB_FAIL(process_macro_rewrite())) { STORAGE_LOG(WARN, "failed to process_macro_rewrite", K(ret)); } + } else if (block_op.is_reorg()) { + if (OB_FAIL(iter_->open_curr_range(false /* rewrite */))) { + STORAGE_LOG(WARN, "Failed to open_curr_range", K(ret)); + } else if (OB_FAIL(append_iter_curr_row_or_range())) { + STORAGE_LOG(WARN, "failed to append iter curr row or range", K(ret), KPC(iter_)); + } } else if (OB_FAIL(write_helper_.append_macro_block(macro_desc))) { STORAGE_LOG(WARN, "failed to append macro block", K(ret), K(macro_desc)); } diff --git a/src/storage/column_store/ob_co_merge_writer.h b/src/storage/column_store/ob_co_merge_writer.h index de598235a8..a726da9787 100644 --- a/src/storage/column_store/ob_co_merge_writer.h +++ b/src/storage/column_store/ob_co_merge_writer.h @@ -154,9 +154,9 @@ protected: const bool only_use_row_table = false); void dump_info() const; int process_macro_rewrite(); + int append_iter_curr_row_or_range(); private: int compare(const ObMergeLog &mergelog, int64_t &cmp_ret, const blocksstable::ObDatumRow &row, bool &skip_curr_row) const; - int append_iter_curr_row_or_range(); int process_mergelog_row(const ObMergeLog &mergelog, const blocksstable::ObDatumRow &row); virtual int process(const ObMacroBlockDesc ¯o_desc) = 0; virtual int process(const blocksstable::ObMicroBlock µ_block) = 0; @@ -164,13 +164,11 @@ private: virtual bool is_cg() const { return false; } //temp code protected: compaction::ObLocalArena allocator_; -private: ObDefaultMergeFuser fuser_; ObMergeIter *iter_; blocksstable::ObDatumRow default_row_; bool is_inited_; bool iter_co_build_row_store_; -protected: share::ObDiagnoseLocation *error_location_; }; diff --git a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp index aaa344870f..db98ce06c2 100644 --- a/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp +++ b/src/storage/compaction/ob_basic_tablet_merge_ctx.cpp @@ -227,6 +227,7 @@ int ObStaticMergeParam::cal_minor_merge_param(const bool has_compaction_filter) } else { set_full_merge_and_level(false/*is_full_merge*/); } + data_version_ = DATA_CURRENT_VERSION; } return ret; } diff --git a/src/storage/compaction/ob_partition_merger.cpp b/src/storage/compaction/ob_partition_merger.cpp index c63610e809..1b6b9b7dd3 100644 --- a/src/storage/compaction/ob_partition_merger.cpp +++ b/src/storage/compaction/ob_partition_merger.cpp @@ -33,6 +33,19 @@ using namespace blocksstable; namespace compaction { + +const char * ObMacroBlockOp::block_op_str_[] = { + "BLOCK_OP_NONE", + "BLOCK_OP_REORG", + "BLOCK_OP_REWRITE" +}; + +const char* ObMacroBlockOp::get_block_op_str() const +{ + STATIC_ASSERT(static_cast(OP_REWRITE) + 1 == ARRAYSIZEOF(block_op_str_), "block op array is mismatch"); + return is_valid() ? block_op_str_[block_op_] : "OP_INVALID"; +} + /* *ObDataDescHelper */ @@ -74,6 +87,7 @@ void ObProgressiveMergeHelper::reset() progressive_merge_round_ = 0; rewrite_block_cnt_ = 0; need_rewrite_block_cnt_ = 0; + data_version_ = 0; full_merge_ = false; check_macro_need_merge_ = false; is_inited_ = false; @@ -94,80 +108,77 @@ int ObProgressiveMergeHelper::init(const ObSSTable &sstable, const ObMergeParame bool last_is_small_data_macro = false; const bool is_major = is_major_merge_type(static_param.get_merge_type()); const bool need_calc_progressive_merge = is_major && static_param.progressive_merge_step_ < static_param.progressive_merge_num_; - const bool need_check_macro_merge = !is_major || static_param.data_version_ >= DATA_VERSION_4_1_0_0; progressive_merge_round_ = static_param.progressive_merge_round_; - if (need_calc_progressive_merge || need_check_macro_merge) { - ObSSTableSecMetaIterator *sec_meta_iter = nullptr; - ObDataMacroBlockMeta macro_meta; - const storage::ObITableReadInfo *index_read_info = nullptr; - if (sstable.is_normal_cg_sstable()) { - if (OB_FAIL(MTL(ObTenantCGReadInfoMgr *)->get_index_read_info(index_read_info))) { - STORAGE_LOG(WARN, "failed to get index read info from ObTenantCGReadInfoMgr", KR(ret)); - } - } else { - index_read_info = static_param.rowkey_read_info_; - } - const ObDatumRange &merge_range = sstable.is_normal_cg_sstable() ? merge_param.merge_rowid_range_ : merge_param.merge_range_; - if (OB_FAIL(ret)) { - } else if (OB_ISNULL(index_read_info)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "index read info is unexpected null", KR(ret), KP(index_read_info), K(sstable), K(merge_param)); - } else if (OB_FAIL(sstable.scan_secondary_meta( - allocator, - merge_range, - *index_read_info, - blocksstable::DATA_BLOCK_META, - sec_meta_iter))) { - STORAGE_LOG(WARN, "Fail to scan secondary meta", K(ret), K(merge_param.merge_range_)); + ObSSTableSecMetaIterator *sec_meta_iter = nullptr; + ObDataMacroBlockMeta macro_meta; + const storage::ObITableReadInfo *index_read_info = nullptr; + if (sstable.is_normal_cg_sstable()) { + if (OB_FAIL(MTL(ObTenantCGReadInfoMgr *)->get_index_read_info(index_read_info))) { + STORAGE_LOG(WARN, "failed to get index read info from ObTenantCGReadInfoMgr", KR(ret)); } + } else { + index_read_info = static_param.rowkey_read_info_; + } + const ObDatumRange &merge_range = sstable.is_normal_cg_sstable() ? merge_param.merge_rowid_range_ : merge_param.merge_range_; + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(index_read_info)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "index read info is unexpected null", KR(ret), KP(index_read_info), K(sstable), K(merge_param)); + } else if (OB_FAIL(sstable.scan_secondary_meta( + allocator, + merge_range, + *index_read_info, + blocksstable::DATA_BLOCK_META, + sec_meta_iter))) { + STORAGE_LOG(WARN, "Fail to scan secondary meta", K(ret), K(merge_range)); + } - while (OB_SUCC(ret)) { - if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) { - if (OB_ITER_END != ret) { - STORAGE_LOG(WARN, "Failed to get next macro block", K(ret)); - } else { - ret = OB_SUCCESS; - break; - } - } else if (macro_meta.val_.progressive_merge_round_ < progressive_merge_round_) { - ++rewrite_block_cnt_for_progressive; - } - if (macro_meta.val_.data_zsize_ < OB_DEFAULT_MACRO_BLOCK_SIZE * - ObMacroBlockWriter::DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) { - rewrite_macro_cnt++; - if (last_is_small_data_macro) { - reduce_macro_cnt++; - } - last_is_small_data_macro = true; + while (OB_SUCC(ret)) { + if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) { + if (OB_ITER_END != ret) { + STORAGE_LOG(WARN, "Failed to get next macro block", K(ret)); } else { - last_is_small_data_macro = false; + ret = OB_SUCCESS; + break; } + } else if (macro_meta.val_.progressive_merge_round_ < progressive_merge_round_) { + ++rewrite_block_cnt_for_progressive; } - if (OB_NOT_NULL(sec_meta_iter)) { - sec_meta_iter->~ObSSTableSecMetaIterator(); - allocator.free(sec_meta_iter); - } - if (OB_SUCC(ret)) { - if (need_calc_progressive_merge) { - need_rewrite_block_cnt_ = MAX(rewrite_block_cnt_for_progressive / - (static_param.progressive_merge_num_ - static_param.progressive_merge_step_), 1L); - STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", static_param.get_tablet_id(), - K(need_rewrite_block_cnt_), K(static_param.progressive_merge_step_), - K(static_param.progressive_merge_num_), K(progressive_merge_round_), K(table_idx_)); - } - if (need_check_macro_merge) { - check_macro_need_merge_ = rewrite_macro_cnt <= (reduce_macro_cnt * 2); - if (sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) { - check_macro_need_merge_ = true; - } - STORAGE_LOG(INFO, "finish macro block need merge check", K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_)); + if (macro_meta.val_.data_zsize_ < OB_DEFAULT_MACRO_BLOCK_SIZE * + ObMacroBlockWriter::DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) { + rewrite_macro_cnt++; + if (last_is_small_data_macro) { + reduce_macro_cnt++; } + last_is_small_data_macro = true; + } else { + last_is_small_data_macro = false; } } + if (OB_NOT_NULL(sec_meta_iter)) { + sec_meta_iter->~ObSSTableSecMetaIterator(); + allocator.free(sec_meta_iter); + } + if (OB_SUCC(ret)) { + if (need_calc_progressive_merge) { + need_rewrite_block_cnt_ = MAX(rewrite_block_cnt_for_progressive / + (static_param.progressive_merge_num_ - static_param.progressive_merge_step_), 1L); + STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", static_param.get_tablet_id(), + K(need_rewrite_block_cnt_), K(static_param.progressive_merge_step_), + K(static_param.progressive_merge_num_), K(progressive_merge_round_), K(table_idx_)); + } + check_macro_need_merge_ = rewrite_macro_cnt <= (reduce_macro_cnt * 2); + if (static_param.data_version_ < DATA_VERSION_4_3_2_0 + && sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) { + check_macro_need_merge_ = true; + } + STORAGE_LOG(INFO, "finish macro block need merge check", K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_)); + } if (OB_SUCC(ret)) { + data_version_ = static_param.data_version_; is_inited_ = true; } } @@ -175,23 +186,43 @@ int ObProgressiveMergeHelper::init(const ObSSTable &sstable, const ObMergeParame return ret; } -int ObProgressiveMergeHelper::need_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &need_rewrite) const +int ObProgressiveMergeHelper::check_macro_block_op(const ObMacroBlockDesc ¯o_desc, + ObMacroBlockOp &block_op) const { int ret = OB_SUCCESS; - need_rewrite = false; + block_op.reset(); if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; STORAGE_LOG(WARN, "ObProgressiveMergeHelper not init", K(ret)); + } else if (!macro_desc.is_valid_with_macro_meta()) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "Invalid macro desc", K(ret), K(macro_desc)); } else if (full_merge_) { - need_rewrite = true; - } else if (need_rewrite_block_cnt_ == 0) { - } else if (macro_desc.is_valid_with_macro_meta()) { - const int64_t block_merge_round = macro_desc.macro_meta_->val_.progressive_merge_round_; - need_rewrite = need_rewrite_block_cnt_ > rewrite_block_cnt_ && block_merge_round < progressive_merge_round_; + block_op.set_rewrite(); + } else { + if (need_rewrite_block_cnt_ > 0) { + const int64_t block_merge_round = macro_desc.macro_meta_->val_.progressive_merge_round_; + if(need_rewrite_block_cnt_ > rewrite_block_cnt_ && block_merge_round < progressive_merge_round_) { + block_op.set_rewrite(); + } + } + if (block_op.is_none()) { + if (!check_macro_need_merge_) { + } else if (macro_desc.macro_meta_->val_.data_zsize_ + < OB_SERVER_BLOCK_MGR.get_macro_block_size() * DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) { + // before 432 we need rewrite theis macro block + if (data_version_ < DATA_VERSION_4_3_2_0) { + block_op.set_rewrite(); + } else { + block_op.set_reorg(); + } + } + } } return ret; + } /* @@ -522,18 +553,20 @@ int ObPartitionMerger::merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, i STORAGE_LOG(WARN, "iter macro_block_opened", K(ret), KPC(iter)); } else { const ObMacroBlockDesc *macro_desc = nullptr; + ObMacroBlockOp block_op; if (OB_FAIL(iter->get_curr_macro_block(macro_desc))) { STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter)); } else if (OB_ISNULL(macro_desc) || OB_UNLIKELY(!macro_desc->is_valid())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "Unexpected null macro block", K(ret), KPC(macro_desc), KPC(iter)); - } else if (OB_FAIL(try_rewrite_macro_block(*macro_desc, rewrite))) { + } else if (OB_FAIL(check_macro_block_op(*macro_desc, block_op))) { STORAGE_LOG(WARN, "Failed to try_rewrite_macro_block", K(ret)); - } else if (OB_UNLIKELY(!iter->is_sstable_iter())) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "this is not sstable iter", K(ret), KPC(iter)); - } else if (rewrite || reinterpret_cast(iter->get_table())->is_small_sstable()) { + } else if (block_op.is_rewrite()) { if (OB_FAIL(rewrite_macro_block(minimum_iters))) { + STORAGE_LOG(WARN, "Failed to rewrite macro block", K(ret)); + } + } else if (block_op.is_reorg()) { + if (OB_FAIL(iter->open_curr_range(false /* rewrite */))) { STORAGE_LOG(WARN, "Failed to open_curr_range", K(ret)); } } else if (OB_FAIL(process(*macro_desc))) { @@ -550,15 +583,16 @@ int ObPartitionMerger::merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, i return ret; } -int ObPartitionMerger::try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite) +int ObPartitionMerger::check_macro_block_op(const ObMacroBlockDesc ¯o_desc, ObMacroBlockOp &block_op) { int ret = OB_SUCCESS; - rewrite = false; + block_op.reset(); if (!progressive_merge_helper_.is_valid()) { - } else if (progressive_merge_helper_.need_check_macro_merge() - && OB_FAIL(macro_writer_->check_data_macro_block_need_merge(macro_desc, rewrite))) { - STORAGE_LOG(WARN, "Failed to check data macro block need merge", K(ret), K(macro_desc)); + } else if (OB_FAIL(progressive_merge_helper_.check_macro_block_op(macro_desc, block_op))) { + STORAGE_LOG(WARN, "failed to check macro operation", K(ret), K(macro_desc)); + } else if (block_op.is_rewrite()) { + progressive_merge_helper_.inc_rewrite_block_cnt(); } return ret; @@ -793,23 +827,7 @@ int ObPartitionMajorMerger::merge_micro_block_iter(ObPartitionMergeIter &iter, i return ret; } -int ObPartitionMajorMerger::try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite) -{ - int ret = OB_SUCCESS; - - rewrite = false; - if (!progressive_merge_helper_.is_valid()) { - } else if (OB_FAIL(progressive_merge_helper_.need_rewrite_macro_block(macro_desc, rewrite))) { - STORAGE_LOG(WARN, "failed to check need_rewrite_macro_block", K(ret), K(macro_desc)); - } else if (rewrite) { - progressive_merge_helper_.inc_rewrite_block_cnt(); - } else if (OB_FAIL(ObPartitionMerger::try_rewrite_macro_block(macro_desc, rewrite))) { - STORAGE_LOG(WARN, "fail to try_rewrite_macro_block", K(ret)); - } - - return ret; -} - +//TODO this func should be replaced with ObPartitionMinorMerger:::rewrite_macro_block int ObPartitionMajorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) { int ret = OB_SUCCESS; diff --git a/src/storage/compaction/ob_partition_merger.h b/src/storage/compaction/ob_partition_merger.h index e766f6e6c8..b0f8338eba 100644 --- a/src/storage/compaction/ob_partition_merger.h +++ b/src/storage/compaction/ob_partition_merger.h @@ -48,6 +48,32 @@ class ObPartitionMergeHelper; class ObPartitionMinorMergeHelper; class ObTabletMergeInfo; +struct ObMacroBlockOp { + enum BlockOp: uint8_t { + OP_NONE = 0, + OP_REORG = 1, + OP_REWRITE = 2 + }; + + ObMacroBlockOp() = default; + ~ObMacroBlockOp() = default; + OB_INLINE void reset() { block_op_ = OP_NONE; } + OB_INLINE bool is_none() const { return block_op_ == OP_NONE; } + OB_INLINE bool is_rewrite() const { return block_op_ == OP_REWRITE; } + OB_INLINE bool is_reorg() const { return block_op_ == OP_REORG; } + OB_INLINE bool is_open() const { return is_reorg() || is_rewrite(); } + OB_INLINE void set_rewrite() { block_op_ = OP_REWRITE; } + OB_INLINE void set_reorg() { block_op_ = OP_REORG; } + OB_INLINE bool is_valid() const { return block_op_ <= OP_REWRITE && block_op_ >= OP_NONE; } + const char* get_block_op_str() const; + + TO_STRING_KV("op_type", get_block_op_str()); + + BlockOp block_op_; +private: + const static char * block_op_str_[]; +}; + class ObDataDescHelper final { public: static int build( @@ -65,6 +91,7 @@ public: progressive_merge_round_(0), rewrite_block_cnt_(0), need_rewrite_block_cnt_(0), + data_version_(0), full_merge_(false), check_macro_need_merge_(false), is_inited_(false) @@ -73,17 +100,18 @@ public: int init(const ObSSTable &sstable, const ObMergeParameter &merge_param, ObIAllocator &allocator); void reset(); inline bool is_valid() const { return is_inited_; } - int need_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &need_rewrite) const; + int check_macro_block_op(const ObMacroBlockDesc ¯o_desc, ObMacroBlockOp &block_op) const; inline void inc_rewrite_block_cnt() { rewrite_block_cnt_++; } inline bool is_progressive_merge_finish() { return need_rewrite_block_cnt_ == 0 || rewrite_block_cnt_ >= need_rewrite_block_cnt_; } - inline bool need_check_macro_merge() const { return check_macro_need_merge_; } - TO_STRING_KV(K_(table_idx), K_(progressive_merge_round), K_(rewrite_block_cnt), K_(need_rewrite_block_cnt), K_(full_merge), K_(check_macro_need_merge), K_(is_inited)); + TO_STRING_KV(K_(table_idx), K_(progressive_merge_round), K_(rewrite_block_cnt), K_(need_rewrite_block_cnt), K_(data_version), K_(full_merge), K_(check_macro_need_merge), K_(is_inited)); private: const static int64_t CG_TABLE_CHECK_REWRITE_CNT_ = 4; + const static int64_t DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD = 30; const int64_t table_idx_; int64_t progressive_merge_round_; int64_t rewrite_block_cnt_; int64_t need_rewrite_block_cnt_; + int64_t data_version_; bool full_merge_; bool check_macro_need_merge_; bool is_inited_; @@ -148,7 +176,7 @@ protected: virtual int process(const blocksstable::ObDatumRow &row); virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) = 0; virtual int merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, int64_t &reuse_row_cnt); - virtual int try_rewrite_macro_block(const ObMacroBlockDesc ¯o_block, bool &rewrite); + virtual int check_macro_block_op(const ObMacroBlockDesc ¯o_desc, ObMacroBlockOp &block_op); virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) = 0; int check_row_columns(const blocksstable::ObDatumRow &row); int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret); @@ -184,7 +212,6 @@ protected: private: virtual int inner_init() override; int init_progressive_merge_helper(); - virtual int try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite) override; virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) override; virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override; int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt); diff --git a/src/storage/compaction/ob_partition_rows_merger.cpp b/src/storage/compaction/ob_partition_rows_merger.cpp index bb35002a48..d4a2de8ff7 100644 --- a/src/storage/compaction/ob_partition_rows_merger.cpp +++ b/src/storage/compaction/ob_partition_rows_merger.cpp @@ -923,8 +923,8 @@ ObPartitionMergeIter *ObPartitionMajorMergeHelper::alloc_merge_iter(const ObMerg const ObITable *table) { ObPartitionMergeIter *merge_iter = nullptr; - if (is_base_iter && !merge_param.is_full_merge()) { - if (MICRO_BLOCK_MERGE_LEVEL == merge_param.static_param_.merge_level_ && !is_small_sstable) { + if (is_base_iter && !merge_param.is_full_merge() && !is_small_sstable) { + if (MICRO_BLOCK_MERGE_LEVEL == merge_param.static_param_.merge_level_) { merge_iter = alloc_helper(allocator_, allocator_); } else { merge_iter = alloc_helper(allocator_, allocator_); diff --git a/src/storage/compaction/ob_sstable_builder.cpp b/src/storage/compaction/ob_sstable_builder.cpp index 64af5d4a22..ec48aa22e2 100644 --- a/src/storage/compaction/ob_sstable_builder.cpp +++ b/src/storage/compaction/ob_sstable_builder.cpp @@ -15,6 +15,7 @@ #include "storage/blocksstable/index_block/ob_index_block_builder.h" #include "storage/blocksstable/ob_macro_block_meta.h" #include "storage/ob_sstable_struct.h" +#include "storage/compaction/ob_basic_tablet_merge_ctx.h" namespace oceanbase { @@ -170,7 +171,7 @@ int ObSSTableBuilder::prepare_index_builder() } int ObSSTableBuilder::build_sstable_merge_res( - const share::SCN end_scn, + const ObStaticMergeParam &merge_param, ObSSTableMergeInfo &sstable_merge_info, blocksstable::ObSSTableMergeRes &res) { @@ -179,14 +180,13 @@ int ObSSTableBuilder::build_sstable_merge_res( macro_id_array.set_attr(ObMemAttr(MTL_ID(), "sstBuilder", ObCtxIds::MERGE_NORMAL_CTX_ID)); blocksstable::ObSSTableIndexBuilder::ObMacroMetaIter iter; int64_t multiplexed_macro_block_count = 0; - if (OB_FAIL(rebuild_index_builder_.init(data_store_desc_.get_desc()))) { STORAGE_LOG(WARN, "fail to init", K(ret), K(data_store_desc_)); } else if (OB_FAIL(open_macro_writer())) { STORAGE_LOG(WARN, "fail to open macro writer", K(ret)); } else if (OB_FAIL(index_builder_.init_meta_iter(iter))) { STORAGE_LOG(WARN, "fail to init meta iter", K(ret), K(index_builder_)); - } else if (OB_FAIL(check_need_rebuild(end_scn, macro_id_array, iter, multiplexed_macro_block_count))) { + } else if (OB_FAIL(check_need_rebuild(merge_param, macro_id_array, iter, multiplexed_macro_block_count))) { STORAGE_LOG(WARN, "failed to check need rebuild", K(ret)); } else if (macro_id_array.count() != 0) { iter.reuse(); @@ -208,8 +208,8 @@ int ObSSTableBuilder::build_sstable_merge_res( int ObSSTableBuilder::open_macro_writer() { - int ret = OB_SUCCESS; - blocksstable::ObMacroDataSeq macro_start_seq(0); + int ret = OB_SUCCESS; + blocksstable::ObMacroDataSeq macro_start_seq(0); data_store_desc_.get_desc().sstable_index_builder_ = &rebuild_index_builder_; macro_start_seq.set_rebuild_merge_type(); @@ -220,7 +220,22 @@ int ObSSTableBuilder::open_macro_writer() return ret; } -int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn, +int ObSSTableBuilder::pre_check_rebuild(const ObStaticMergeParam &merge_param, bool &need_check_rebuild) +{ + int ret = OB_SUCCESS; + need_check_rebuild = true; + const int64_t data_version = data_store_desc_.get_desc().get_major_working_cluster_version(); + if (data_version < DATA_VERSION_4_3_0_0) { + need_check_rebuild = false; + } else if (data_version >= DATA_VERSION_4_3_2_0) { + if (merge_param.concurrent_cnt_ <= 1) { + need_check_rebuild = false; + } + } + return ret; +} + +int ObSSTableBuilder::check_need_rebuild(const ObStaticMergeParam &merge_param, ObIArray ¯o_id_array, MetaIter &iter, int64_t &multiplexed_macro_block_count) @@ -228,15 +243,17 @@ int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn, int ret = OB_SUCCESS; macro_id_array.reset(); multiplexed_macro_block_count = 0; - const int64_t snapshot_version = end_scn.get_val_for_tx(); - if (data_store_desc_.get_desc().get_major_working_cluster_version() < DATA_VERSION_4_3_0_0) { - } else { - const blocksstable::ObDataMacroBlockMeta *macro_meta; - blocksstable::MacroBlockId last_macro_id; - int64_t last_macro_block_sum = 0; - int64_t reduce_macro_block_cnt = 0; - bool last_macro_is_first = false; + const int64_t snapshot_version = merge_param.scn_range_.end_scn_.get_val_for_tx(); + const blocksstable::ObDataMacroBlockMeta *macro_meta; + blocksstable::MacroBlockId last_macro_id; + int64_t last_macro_block_sum = 0; + int64_t reduce_macro_block_cnt = 0; + bool last_macro_is_first = false; + bool need_check_rebuild = true; + if (OB_FAIL(pre_check_rebuild(merge_param, need_check_rebuild))) { + STORAGE_LOG(WARN, "Fail to pre check need rebuild", K(ret)); + } else if (need_check_rebuild) { while (OB_SUCC(ret) && OB_SUCC(iter.get_next_macro_block(macro_meta))) { if (OB_ISNULL(macro_meta)) { ret = OB_ERR_UNEXPECTED; @@ -252,7 +269,7 @@ int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn, last_macro_is_first = true; last_macro_block_sum = macro_block_sum; multiplexed_macro_block_count = snapshot_version != macro_meta->val_.snapshot_version_ ? - multiplexed_macro_block_count + 1 : multiplexed_macro_block_count; + multiplexed_macro_block_count + 1 : multiplexed_macro_block_count; } else { if (last_macro_is_first && OB_FAIL(macro_id_array.push_back(last_macro_id))) { STORAGE_LOG(WARN, "failed to push back macro id", K(ret), K(last_macro_id)); diff --git a/src/storage/compaction/ob_sstable_builder.h b/src/storage/compaction/ob_sstable_builder.h index a38f6ff5c0..078f85f949 100644 --- a/src/storage/compaction/ob_sstable_builder.h +++ b/src/storage/compaction/ob_sstable_builder.h @@ -26,6 +26,7 @@ class MacroBlockId; } namespace compaction { +struct ObStaticMergeParam; class ObSSTableRebuildMicroBlockIter final { @@ -80,7 +81,7 @@ public: void reset(); int set_index_read_info(const ObITableReadInfo *read_info); int prepare_index_builder(); - int build_sstable_merge_res(const share::SCN end_scn, ObSSTableMergeInfo &sstable_merge_info_, blocksstable::ObSSTableMergeRes &res); + int build_sstable_merge_res(const ObStaticMergeParam &merge_param, ObSSTableMergeInfo &sstable_merge_info_, blocksstable::ObSSTableMergeRes &res); int build_reused_small_sst_merge_res( const int64_t macro_read_size, const int64_t macro_offset, @@ -100,10 +101,11 @@ private: const blocksstable::ObDataMacroBlockMeta &curr_macro_meta, bool &need_merge); int rewrite_macro_block(ObSSTableRebuildMicroBlockIter µ_iter); - int check_need_rebuild(const share::SCN end_scn, + int check_need_rebuild(const ObStaticMergeParam &merge_param, ObIArray ¯o_id_array, MetaIter &iter, int64_t &multiplexed_macro_block_count); + int pre_check_rebuild(const ObStaticMergeParam &merge_param, bool &need_check_rebuild); bool check_macro_block_could_merge(const blocksstable::ObDataMacroBlockMeta ¯o_meta) const { return data_store_desc_.get_desc().get_row_store_type() == macro_meta.val_.row_store_type_ @@ -123,4 +125,4 @@ private: } // namespace compaction } // namespace oceanbase -#endif \ No newline at end of file +#endif diff --git a/src/storage/compaction/ob_tablet_merge_info.cpp b/src/storage/compaction/ob_tablet_merge_info.cpp index 639724e228..d7db573031 100644 --- a/src/storage/compaction/ob_tablet_merge_info.cpp +++ b/src/storage/compaction/ob_tablet_merge_info.cpp @@ -231,7 +231,7 @@ int ObTabletMergeInfo::create_sstable( SMART_VARS_2((ObSSTableMergeRes, res), (ObTabletCreateSSTableParam, param)) { if (!is_reused_small_sst - && OB_FAIL(sstable_builder_.build_sstable_merge_res(ctx.static_param_.scn_range_.end_scn_, sstable_merge_info_, res))) { + && OB_FAIL(sstable_builder_.build_sstable_merge_res(ctx.static_param_, sstable_merge_info_, res))) { LOG_WARN("fail to close index builder", K(ret), KPC(sstable), "is_small_sst", sstable->is_small_sstable()); CTX_SET_DIAGNOSE_LOCATION(ctx); } else if (is_reused_small_sst && OB_FAIL(sstable_builder_.build_reused_small_sst_merge_res(sstable->get_macro_read_size(),