add reorg macro block op for compaction

This commit is contained in:
chaser-ch
2024-06-19 10:30:48 +00:00
committed by ob-robot
parent b5e83b243b
commit 48d19b380c
9 changed files with 201 additions and 137 deletions

View File

@ -348,7 +348,6 @@ int ObCOMergeWriter::append_iter_curr_row_or_range()
} }
} else { } else {
const ObMacroBlockDesc *macro_desc = nullptr; const ObMacroBlockDesc *macro_desc = nullptr;
bool need_rewrite = false;
if (OB_FAIL(iter_->get_curr_macro_block(macro_desc))) { if (OB_FAIL(iter_->get_curr_macro_block(macro_desc))) {
STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter_)); STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter_));
@ -394,7 +393,7 @@ int ObCOMergeWriter::compare(const ObMergeLog &mergelog, int64_t &cmp_ret, const
skip_curr_row = true; skip_curr_row = true;
break; break;
} else if (FALSE_IT(check_iter_range = true)) { } else if (FALSE_IT(check_iter_range = true)) {
} else if (OB_FAIL(iter_->open_curr_range(false))) { } else if (OB_FAIL(iter_->open_curr_range(false /* rewrite */))) {
STORAGE_LOG(WARN, "failed to open curr range", K(ret), KPC(iter_)); STORAGE_LOG(WARN, "failed to open curr range", K(ret), KPC(iter_));
} }
} }
@ -444,7 +443,7 @@ int ObCOMergeWriter::process_macro_rewrite()
if (OB_UNLIKELY(iter_->is_macro_block_opened())) { if (OB_UNLIKELY(iter_->is_macro_block_opened())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected macro block opened", K(ret), KPC(iter_)); STORAGE_LOG(WARN, "unexpected macro block opened", K(ret), KPC(iter_));
} else if (OB_FAIL(iter_->open_curr_range(true))) { } else if (OB_FAIL(iter_->open_curr_range(true /* rewrite */))) {
STORAGE_LOG(WARN, "failed to open iter range", K(ret), KPC(iter_)); STORAGE_LOG(WARN, "failed to open iter range", K(ret), KPC(iter_));
} else if (OB_ISNULL(iter_->get_curr_row())) { } else if (OB_ISNULL(iter_->get_curr_row())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
@ -539,23 +538,25 @@ int ObCOMergeRowWriter::init(const blocksstable::ObDatumRow &default_row,
int ObCOMergeRowWriter::process(const ObMacroBlockDesc &macro_desc) int ObCOMergeRowWriter::process(const ObMacroBlockDesc &macro_desc)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
bool need_rewrite = false; ObMacroBlockOp block_op;
if (OB_NOT_NULL(progressive_merge_helper_)) { if (OB_NOT_NULL(progressive_merge_helper_) && progressive_merge_helper_->is_valid()) {
if (OB_FAIL(progressive_merge_helper_->need_rewrite_macro_block(macro_desc, need_rewrite))) { if (OB_FAIL(progressive_merge_helper_->check_macro_block_op(macro_desc, block_op))) {
STORAGE_LOG(WARN, "failed to check need_rewrite_macro_block", K(ret), K(macro_desc)); STORAGE_LOG(WARN, "failed to check macro operation", K(ret), K(macro_desc));
} else if (need_rewrite) {
progressive_merge_helper_->inc_rewrite_block_cnt();
} else if (progressive_merge_helper_->need_check_macro_merge()
&& OB_FAIL(write_helper_.check_data_macro_block_need_merge(macro_desc, need_rewrite))) {
STORAGE_LOG(WARN, "Failed to check data macro block need merge", K(ret), K(macro_desc));
} }
} }
if (OB_FAIL(ret)) { if (OB_FAIL(ret)) {
} else if (need_rewrite) { } else if (block_op.is_rewrite()) {
progressive_merge_helper_->inc_rewrite_block_cnt();
if (OB_FAIL(process_macro_rewrite())) { if (OB_FAIL(process_macro_rewrite())) {
STORAGE_LOG(WARN, "failed to process_macro_rewrite", K(ret)); STORAGE_LOG(WARN, "failed to process_macro_rewrite", K(ret));
} }
} else if (block_op.is_reorg()) {
if (OB_FAIL(iter_->open_curr_range(false /* rewrite */))) {
STORAGE_LOG(WARN, "Failed to open_curr_range", K(ret));
} else if (OB_FAIL(append_iter_curr_row_or_range())) {
STORAGE_LOG(WARN, "failed to append iter curr row or range", K(ret), KPC(iter_));
}
} else if (OB_FAIL(write_helper_.append_macro_block(macro_desc))) { } else if (OB_FAIL(write_helper_.append_macro_block(macro_desc))) {
STORAGE_LOG(WARN, "failed to append macro block", K(ret), K(macro_desc)); STORAGE_LOG(WARN, "failed to append macro block", K(ret), K(macro_desc));
} }

View File

@ -154,9 +154,9 @@ protected:
const bool only_use_row_table = false); const bool only_use_row_table = false);
void dump_info() const; void dump_info() const;
int process_macro_rewrite(); int process_macro_rewrite();
int append_iter_curr_row_or_range();
private: private:
int compare(const ObMergeLog &mergelog, int64_t &cmp_ret, const blocksstable::ObDatumRow &row, bool &skip_curr_row) const; int compare(const ObMergeLog &mergelog, int64_t &cmp_ret, const blocksstable::ObDatumRow &row, bool &skip_curr_row) const;
int append_iter_curr_row_or_range();
int process_mergelog_row(const ObMergeLog &mergelog, const blocksstable::ObDatumRow &row); int process_mergelog_row(const ObMergeLog &mergelog, const blocksstable::ObDatumRow &row);
virtual int process(const ObMacroBlockDesc &macro_desc) = 0; virtual int process(const ObMacroBlockDesc &macro_desc) = 0;
virtual int process(const blocksstable::ObMicroBlock &micro_block) = 0; virtual int process(const blocksstable::ObMicroBlock &micro_block) = 0;
@ -164,13 +164,11 @@ private:
virtual bool is_cg() const { return false; } //temp code virtual bool is_cg() const { return false; } //temp code
protected: protected:
compaction::ObLocalArena allocator_; compaction::ObLocalArena allocator_;
private:
ObDefaultMergeFuser fuser_; ObDefaultMergeFuser fuser_;
ObMergeIter *iter_; ObMergeIter *iter_;
blocksstable::ObDatumRow default_row_; blocksstable::ObDatumRow default_row_;
bool is_inited_; bool is_inited_;
bool iter_co_build_row_store_; bool iter_co_build_row_store_;
protected:
share::ObDiagnoseLocation *error_location_; share::ObDiagnoseLocation *error_location_;
}; };

View File

@ -227,6 +227,7 @@ int ObStaticMergeParam::cal_minor_merge_param(const bool has_compaction_filter)
} else { } else {
set_full_merge_and_level(false/*is_full_merge*/); set_full_merge_and_level(false/*is_full_merge*/);
} }
data_version_ = DATA_CURRENT_VERSION;
} }
return ret; return ret;
} }

View File

@ -33,6 +33,19 @@ using namespace blocksstable;
namespace compaction namespace compaction
{ {
const char * ObMacroBlockOp::block_op_str_[] = {
"BLOCK_OP_NONE",
"BLOCK_OP_REORG",
"BLOCK_OP_REWRITE"
};
const char* ObMacroBlockOp::get_block_op_str() const
{
STATIC_ASSERT(static_cast<int64_t>(OP_REWRITE) + 1 == ARRAYSIZEOF(block_op_str_), "block op array is mismatch");
return is_valid() ? block_op_str_[block_op_] : "OP_INVALID";
}
/* /*
*ObDataDescHelper *ObDataDescHelper
*/ */
@ -74,6 +87,7 @@ void ObProgressiveMergeHelper::reset()
progressive_merge_round_ = 0; progressive_merge_round_ = 0;
rewrite_block_cnt_ = 0; rewrite_block_cnt_ = 0;
need_rewrite_block_cnt_ = 0; need_rewrite_block_cnt_ = 0;
data_version_ = 0;
full_merge_ = false; full_merge_ = false;
check_macro_need_merge_ = false; check_macro_need_merge_ = false;
is_inited_ = false; is_inited_ = false;
@ -94,80 +108,77 @@ int ObProgressiveMergeHelper::init(const ObSSTable &sstable, const ObMergeParame
bool last_is_small_data_macro = false; bool last_is_small_data_macro = false;
const bool is_major = is_major_merge_type(static_param.get_merge_type()); const bool is_major = is_major_merge_type(static_param.get_merge_type());
const bool need_calc_progressive_merge = is_major && static_param.progressive_merge_step_ < static_param.progressive_merge_num_; const bool need_calc_progressive_merge = is_major && static_param.progressive_merge_step_ < static_param.progressive_merge_num_;
const bool need_check_macro_merge = !is_major || static_param.data_version_ >= DATA_VERSION_4_1_0_0;
progressive_merge_round_ = static_param.progressive_merge_round_; progressive_merge_round_ = static_param.progressive_merge_round_;
if (need_calc_progressive_merge || need_check_macro_merge) { ObSSTableSecMetaIterator *sec_meta_iter = nullptr;
ObSSTableSecMetaIterator *sec_meta_iter = nullptr; ObDataMacroBlockMeta macro_meta;
ObDataMacroBlockMeta macro_meta; const storage::ObITableReadInfo *index_read_info = nullptr;
const storage::ObITableReadInfo *index_read_info = nullptr; if (sstable.is_normal_cg_sstable()) {
if (sstable.is_normal_cg_sstable()) { if (OB_FAIL(MTL(ObTenantCGReadInfoMgr *)->get_index_read_info(index_read_info))) {
if (OB_FAIL(MTL(ObTenantCGReadInfoMgr *)->get_index_read_info(index_read_info))) { STORAGE_LOG(WARN, "failed to get index read info from ObTenantCGReadInfoMgr", KR(ret));
STORAGE_LOG(WARN, "failed to get index read info from ObTenantCGReadInfoMgr", KR(ret));
}
} else {
index_read_info = static_param.rowkey_read_info_;
}
const ObDatumRange &merge_range = sstable.is_normal_cg_sstable() ? merge_param.merge_rowid_range_ : merge_param.merge_range_;
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(index_read_info)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "index read info is unexpected null", KR(ret), KP(index_read_info), K(sstable), K(merge_param));
} else if (OB_FAIL(sstable.scan_secondary_meta(
allocator,
merge_range,
*index_read_info,
blocksstable::DATA_BLOCK_META,
sec_meta_iter))) {
STORAGE_LOG(WARN, "Fail to scan secondary meta", K(ret), K(merge_param.merge_range_));
} }
} else {
index_read_info = static_param.rowkey_read_info_;
}
const ObDatumRange &merge_range = sstable.is_normal_cg_sstable() ? merge_param.merge_rowid_range_ : merge_param.merge_range_;
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(index_read_info)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "index read info is unexpected null", KR(ret), KP(index_read_info), K(sstable), K(merge_param));
} else if (OB_FAIL(sstable.scan_secondary_meta(
allocator,
merge_range,
*index_read_info,
blocksstable::DATA_BLOCK_META,
sec_meta_iter))) {
STORAGE_LOG(WARN, "Fail to scan secondary meta", K(ret), K(merge_range));
}
while (OB_SUCC(ret)) { while (OB_SUCC(ret)) {
if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) { if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) {
if (OB_ITER_END != ret) { if (OB_ITER_END != ret) {
STORAGE_LOG(WARN, "Failed to get next macro block", K(ret)); STORAGE_LOG(WARN, "Failed to get next macro block", K(ret));
} else {
ret = OB_SUCCESS;
break;
}
} else if (macro_meta.val_.progressive_merge_round_ < progressive_merge_round_) {
++rewrite_block_cnt_for_progressive;
}
if (macro_meta.val_.data_zsize_ < OB_DEFAULT_MACRO_BLOCK_SIZE *
ObMacroBlockWriter::DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) {
rewrite_macro_cnt++;
if (last_is_small_data_macro) {
reduce_macro_cnt++;
}
last_is_small_data_macro = true;
} else { } else {
last_is_small_data_macro = false; ret = OB_SUCCESS;
break;
} }
} else if (macro_meta.val_.progressive_merge_round_ < progressive_merge_round_) {
++rewrite_block_cnt_for_progressive;
} }
if (OB_NOT_NULL(sec_meta_iter)) { if (macro_meta.val_.data_zsize_ < OB_DEFAULT_MACRO_BLOCK_SIZE *
sec_meta_iter->~ObSSTableSecMetaIterator(); ObMacroBlockWriter::DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) {
allocator.free(sec_meta_iter); rewrite_macro_cnt++;
} if (last_is_small_data_macro) {
if (OB_SUCC(ret)) { reduce_macro_cnt++;
if (need_calc_progressive_merge) {
need_rewrite_block_cnt_ = MAX(rewrite_block_cnt_for_progressive /
(static_param.progressive_merge_num_ - static_param.progressive_merge_step_), 1L);
STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", static_param.get_tablet_id(),
K(need_rewrite_block_cnt_), K(static_param.progressive_merge_step_),
K(static_param.progressive_merge_num_), K(progressive_merge_round_), K(table_idx_));
}
if (need_check_macro_merge) {
check_macro_need_merge_ = rewrite_macro_cnt <= (reduce_macro_cnt * 2);
if (sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) {
check_macro_need_merge_ = true;
}
STORAGE_LOG(INFO, "finish macro block need merge check", K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_));
} }
last_is_small_data_macro = true;
} else {
last_is_small_data_macro = false;
} }
} }
if (OB_NOT_NULL(sec_meta_iter)) {
sec_meta_iter->~ObSSTableSecMetaIterator();
allocator.free(sec_meta_iter);
}
if (OB_SUCC(ret)) {
if (need_calc_progressive_merge) {
need_rewrite_block_cnt_ = MAX(rewrite_block_cnt_for_progressive /
(static_param.progressive_merge_num_ - static_param.progressive_merge_step_), 1L);
STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", static_param.get_tablet_id(),
K(need_rewrite_block_cnt_), K(static_param.progressive_merge_step_),
K(static_param.progressive_merge_num_), K(progressive_merge_round_), K(table_idx_));
}
check_macro_need_merge_ = rewrite_macro_cnt <= (reduce_macro_cnt * 2);
if (static_param.data_version_ < DATA_VERSION_4_3_2_0
&& sstable.is_normal_cg_sstable() && rewrite_macro_cnt < CG_TABLE_CHECK_REWRITE_CNT_) {
check_macro_need_merge_ = true;
}
STORAGE_LOG(INFO, "finish macro block need merge check", K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt), K(table_idx_));
}
if (OB_SUCC(ret)) { if (OB_SUCC(ret)) {
data_version_ = static_param.data_version_;
is_inited_ = true; is_inited_ = true;
} }
} }
@ -175,23 +186,43 @@ int ObProgressiveMergeHelper::init(const ObSSTable &sstable, const ObMergeParame
return ret; return ret;
} }
int ObProgressiveMergeHelper::need_rewrite_macro_block(const ObMacroBlockDesc &macro_desc, bool &need_rewrite) const int ObProgressiveMergeHelper::check_macro_block_op(const ObMacroBlockDesc &macro_desc,
ObMacroBlockOp &block_op) const
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
need_rewrite = false;
block_op.reset();
if (OB_UNLIKELY(!is_inited_)) { if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT; ret = OB_NOT_INIT;
STORAGE_LOG(WARN, "ObProgressiveMergeHelper not init", K(ret)); STORAGE_LOG(WARN, "ObProgressiveMergeHelper not init", K(ret));
} else if (!macro_desc.is_valid_with_macro_meta()) {
ret = OB_INVALID_ARGUMENT;
STORAGE_LOG(WARN, "Invalid macro desc", K(ret), K(macro_desc));
} else if (full_merge_) { } else if (full_merge_) {
need_rewrite = true; block_op.set_rewrite();
} else if (need_rewrite_block_cnt_ == 0) { } else {
} else if (macro_desc.is_valid_with_macro_meta()) { if (need_rewrite_block_cnt_ > 0) {
const int64_t block_merge_round = macro_desc.macro_meta_->val_.progressive_merge_round_; const int64_t block_merge_round = macro_desc.macro_meta_->val_.progressive_merge_round_;
need_rewrite = need_rewrite_block_cnt_ > rewrite_block_cnt_ && block_merge_round < progressive_merge_round_; if(need_rewrite_block_cnt_ > rewrite_block_cnt_ && block_merge_round < progressive_merge_round_) {
block_op.set_rewrite();
}
}
if (block_op.is_none()) {
if (!check_macro_need_merge_) {
} else if (macro_desc.macro_meta_->val_.data_zsize_
< OB_SERVER_BLOCK_MGR.get_macro_block_size() * DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) {
// before 432 we need rewrite theis macro block
if (data_version_ < DATA_VERSION_4_3_2_0) {
block_op.set_rewrite();
} else {
block_op.set_reorg();
}
}
}
} }
return ret; return ret;
} }
/* /*
@ -522,18 +553,20 @@ int ObPartitionMerger::merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, i
STORAGE_LOG(WARN, "iter macro_block_opened", K(ret), KPC(iter)); STORAGE_LOG(WARN, "iter macro_block_opened", K(ret), KPC(iter));
} else { } else {
const ObMacroBlockDesc *macro_desc = nullptr; const ObMacroBlockDesc *macro_desc = nullptr;
ObMacroBlockOp block_op;
if (OB_FAIL(iter->get_curr_macro_block(macro_desc))) { if (OB_FAIL(iter->get_curr_macro_block(macro_desc))) {
STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter)); STORAGE_LOG(WARN, "Failed to get current micro block", K(ret), KPC(iter));
} else if (OB_ISNULL(macro_desc) || OB_UNLIKELY(!macro_desc->is_valid())) { } else if (OB_ISNULL(macro_desc) || OB_UNLIKELY(!macro_desc->is_valid())) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "Unexpected null macro block", K(ret), KPC(macro_desc), KPC(iter)); STORAGE_LOG(WARN, "Unexpected null macro block", K(ret), KPC(macro_desc), KPC(iter));
} else if (OB_FAIL(try_rewrite_macro_block(*macro_desc, rewrite))) { } else if (OB_FAIL(check_macro_block_op(*macro_desc, block_op))) {
STORAGE_LOG(WARN, "Failed to try_rewrite_macro_block", K(ret)); STORAGE_LOG(WARN, "Failed to try_rewrite_macro_block", K(ret));
} else if (OB_UNLIKELY(!iter->is_sstable_iter())) { } else if (block_op.is_rewrite()) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "this is not sstable iter", K(ret), KPC(iter));
} else if (rewrite || reinterpret_cast<const ObSSTable*>(iter->get_table())->is_small_sstable()) {
if (OB_FAIL(rewrite_macro_block(minimum_iters))) { if (OB_FAIL(rewrite_macro_block(minimum_iters))) {
STORAGE_LOG(WARN, "Failed to rewrite macro block", K(ret));
}
} else if (block_op.is_reorg()) {
if (OB_FAIL(iter->open_curr_range(false /* rewrite */))) {
STORAGE_LOG(WARN, "Failed to open_curr_range", K(ret)); STORAGE_LOG(WARN, "Failed to open_curr_range", K(ret));
} }
} else if (OB_FAIL(process(*macro_desc))) { } else if (OB_FAIL(process(*macro_desc))) {
@ -550,15 +583,16 @@ int ObPartitionMerger::merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, i
return ret; return ret;
} }
int ObPartitionMerger::try_rewrite_macro_block(const ObMacroBlockDesc &macro_desc, bool &rewrite) int ObPartitionMerger::check_macro_block_op(const ObMacroBlockDesc &macro_desc, ObMacroBlockOp &block_op)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
rewrite = false; block_op.reset();
if (!progressive_merge_helper_.is_valid()) { if (!progressive_merge_helper_.is_valid()) {
} else if (progressive_merge_helper_.need_check_macro_merge() } else if (OB_FAIL(progressive_merge_helper_.check_macro_block_op(macro_desc, block_op))) {
&& OB_FAIL(macro_writer_->check_data_macro_block_need_merge(macro_desc, rewrite))) { STORAGE_LOG(WARN, "failed to check macro operation", K(ret), K(macro_desc));
STORAGE_LOG(WARN, "Failed to check data macro block need merge", K(ret), K(macro_desc)); } else if (block_op.is_rewrite()) {
progressive_merge_helper_.inc_rewrite_block_cnt();
} }
return ret; return ret;
@ -793,23 +827,7 @@ int ObPartitionMajorMerger::merge_micro_block_iter(ObPartitionMergeIter &iter, i
return ret; return ret;
} }
int ObPartitionMajorMerger::try_rewrite_macro_block(const ObMacroBlockDesc &macro_desc, bool &rewrite) //TODO this func should be replaced with ObPartitionMinorMerger:::rewrite_macro_block
{
int ret = OB_SUCCESS;
rewrite = false;
if (!progressive_merge_helper_.is_valid()) {
} else if (OB_FAIL(progressive_merge_helper_.need_rewrite_macro_block(macro_desc, rewrite))) {
STORAGE_LOG(WARN, "failed to check need_rewrite_macro_block", K(ret), K(macro_desc));
} else if (rewrite) {
progressive_merge_helper_.inc_rewrite_block_cnt();
} else if (OB_FAIL(ObPartitionMerger::try_rewrite_macro_block(macro_desc, rewrite))) {
STORAGE_LOG(WARN, "fail to try_rewrite_macro_block", K(ret));
}
return ret;
}
int ObPartitionMajorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) int ObPartitionMajorMerger::rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;

View File

@ -48,6 +48,32 @@ class ObPartitionMergeHelper;
class ObPartitionMinorMergeHelper; class ObPartitionMinorMergeHelper;
class ObTabletMergeInfo; class ObTabletMergeInfo;
struct ObMacroBlockOp {
enum BlockOp: uint8_t {
OP_NONE = 0,
OP_REORG = 1,
OP_REWRITE = 2
};
ObMacroBlockOp() = default;
~ObMacroBlockOp() = default;
OB_INLINE void reset() { block_op_ = OP_NONE; }
OB_INLINE bool is_none() const { return block_op_ == OP_NONE; }
OB_INLINE bool is_rewrite() const { return block_op_ == OP_REWRITE; }
OB_INLINE bool is_reorg() const { return block_op_ == OP_REORG; }
OB_INLINE bool is_open() const { return is_reorg() || is_rewrite(); }
OB_INLINE void set_rewrite() { block_op_ = OP_REWRITE; }
OB_INLINE void set_reorg() { block_op_ = OP_REORG; }
OB_INLINE bool is_valid() const { return block_op_ <= OP_REWRITE && block_op_ >= OP_NONE; }
const char* get_block_op_str() const;
TO_STRING_KV("op_type", get_block_op_str());
BlockOp block_op_;
private:
const static char * block_op_str_[];
};
class ObDataDescHelper final { class ObDataDescHelper final {
public: public:
static int build( static int build(
@ -65,6 +91,7 @@ public:
progressive_merge_round_(0), progressive_merge_round_(0),
rewrite_block_cnt_(0), rewrite_block_cnt_(0),
need_rewrite_block_cnt_(0), need_rewrite_block_cnt_(0),
data_version_(0),
full_merge_(false), full_merge_(false),
check_macro_need_merge_(false), check_macro_need_merge_(false),
is_inited_(false) is_inited_(false)
@ -73,17 +100,18 @@ public:
int init(const ObSSTable &sstable, const ObMergeParameter &merge_param, ObIAllocator &allocator); int init(const ObSSTable &sstable, const ObMergeParameter &merge_param, ObIAllocator &allocator);
void reset(); void reset();
inline bool is_valid() const { return is_inited_; } inline bool is_valid() const { return is_inited_; }
int need_rewrite_macro_block(const ObMacroBlockDesc &macro_desc, bool &need_rewrite) const; int check_macro_block_op(const ObMacroBlockDesc &macro_desc, ObMacroBlockOp &block_op) const;
inline void inc_rewrite_block_cnt() { rewrite_block_cnt_++; } inline void inc_rewrite_block_cnt() { rewrite_block_cnt_++; }
inline bool is_progressive_merge_finish() { return need_rewrite_block_cnt_ == 0 || rewrite_block_cnt_ >= need_rewrite_block_cnt_; } inline bool is_progressive_merge_finish() { return need_rewrite_block_cnt_ == 0 || rewrite_block_cnt_ >= need_rewrite_block_cnt_; }
inline bool need_check_macro_merge() const { return check_macro_need_merge_; } TO_STRING_KV(K_(table_idx), K_(progressive_merge_round), K_(rewrite_block_cnt), K_(need_rewrite_block_cnt), K_(data_version), K_(full_merge), K_(check_macro_need_merge), K_(is_inited));
TO_STRING_KV(K_(table_idx), K_(progressive_merge_round), K_(rewrite_block_cnt), K_(need_rewrite_block_cnt), K_(full_merge), K_(check_macro_need_merge), K_(is_inited));
private: private:
const static int64_t CG_TABLE_CHECK_REWRITE_CNT_ = 4; const static int64_t CG_TABLE_CHECK_REWRITE_CNT_ = 4;
const static int64_t DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD = 30;
const int64_t table_idx_; const int64_t table_idx_;
int64_t progressive_merge_round_; int64_t progressive_merge_round_;
int64_t rewrite_block_cnt_; int64_t rewrite_block_cnt_;
int64_t need_rewrite_block_cnt_; int64_t need_rewrite_block_cnt_;
int64_t data_version_;
bool full_merge_; bool full_merge_;
bool check_macro_need_merge_; bool check_macro_need_merge_;
bool is_inited_; bool is_inited_;
@ -148,7 +176,7 @@ protected:
virtual int process(const blocksstable::ObDatumRow &row); virtual int process(const blocksstable::ObDatumRow &row);
virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) = 0; virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) = 0;
virtual int merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, int64_t &reuse_row_cnt); virtual int merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, int64_t &reuse_row_cnt);
virtual int try_rewrite_macro_block(const ObMacroBlockDesc &macro_block, bool &rewrite); virtual int check_macro_block_op(const ObMacroBlockDesc &macro_desc, ObMacroBlockOp &block_op);
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) = 0; virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) = 0;
int check_row_columns(const blocksstable::ObDatumRow &row); int check_row_columns(const blocksstable::ObDatumRow &row);
int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret); int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret);
@ -184,7 +212,6 @@ protected:
private: private:
virtual int inner_init() override; virtual int inner_init() override;
int init_progressive_merge_helper(); int init_progressive_merge_helper();
virtual int try_rewrite_macro_block(const ObMacroBlockDesc &macro_desc, bool &rewrite) override;
virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) override; virtual int rewrite_macro_block(MERGE_ITER_ARRAY &minimum_iters) override;
virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override; virtual int merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters) override;
int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt); int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt);

View File

@ -923,8 +923,8 @@ ObPartitionMergeIter *ObPartitionMajorMergeHelper::alloc_merge_iter(const ObMerg
const ObITable *table) const ObITable *table)
{ {
ObPartitionMergeIter *merge_iter = nullptr; ObPartitionMergeIter *merge_iter = nullptr;
if (is_base_iter && !merge_param.is_full_merge()) { if (is_base_iter && !merge_param.is_full_merge() && !is_small_sstable) {
if (MICRO_BLOCK_MERGE_LEVEL == merge_param.static_param_.merge_level_ && !is_small_sstable) { if (MICRO_BLOCK_MERGE_LEVEL == merge_param.static_param_.merge_level_) {
merge_iter = alloc_helper<ObPartitionMicroMergeIter>(allocator_, allocator_); merge_iter = alloc_helper<ObPartitionMicroMergeIter>(allocator_, allocator_);
} else { } else {
merge_iter = alloc_helper<ObPartitionMacroMergeIter>(allocator_, allocator_); merge_iter = alloc_helper<ObPartitionMacroMergeIter>(allocator_, allocator_);

View File

@ -15,6 +15,7 @@
#include "storage/blocksstable/index_block/ob_index_block_builder.h" #include "storage/blocksstable/index_block/ob_index_block_builder.h"
#include "storage/blocksstable/ob_macro_block_meta.h" #include "storage/blocksstable/ob_macro_block_meta.h"
#include "storage/ob_sstable_struct.h" #include "storage/ob_sstable_struct.h"
#include "storage/compaction/ob_basic_tablet_merge_ctx.h"
namespace oceanbase namespace oceanbase
{ {
@ -170,7 +171,7 @@ int ObSSTableBuilder::prepare_index_builder()
} }
int ObSSTableBuilder::build_sstable_merge_res( int ObSSTableBuilder::build_sstable_merge_res(
const share::SCN end_scn, const ObStaticMergeParam &merge_param,
ObSSTableMergeInfo &sstable_merge_info, ObSSTableMergeInfo &sstable_merge_info,
blocksstable::ObSSTableMergeRes &res) blocksstable::ObSSTableMergeRes &res)
{ {
@ -179,14 +180,13 @@ int ObSSTableBuilder::build_sstable_merge_res(
macro_id_array.set_attr(ObMemAttr(MTL_ID(), "sstBuilder", ObCtxIds::MERGE_NORMAL_CTX_ID)); macro_id_array.set_attr(ObMemAttr(MTL_ID(), "sstBuilder", ObCtxIds::MERGE_NORMAL_CTX_ID));
blocksstable::ObSSTableIndexBuilder::ObMacroMetaIter iter; blocksstable::ObSSTableIndexBuilder::ObMacroMetaIter iter;
int64_t multiplexed_macro_block_count = 0; int64_t multiplexed_macro_block_count = 0;
if (OB_FAIL(rebuild_index_builder_.init(data_store_desc_.get_desc()))) { if (OB_FAIL(rebuild_index_builder_.init(data_store_desc_.get_desc()))) {
STORAGE_LOG(WARN, "fail to init", K(ret), K(data_store_desc_)); STORAGE_LOG(WARN, "fail to init", K(ret), K(data_store_desc_));
} else if (OB_FAIL(open_macro_writer())) { } else if (OB_FAIL(open_macro_writer())) {
STORAGE_LOG(WARN, "fail to open macro writer", K(ret)); STORAGE_LOG(WARN, "fail to open macro writer", K(ret));
} else if (OB_FAIL(index_builder_.init_meta_iter(iter))) { } else if (OB_FAIL(index_builder_.init_meta_iter(iter))) {
STORAGE_LOG(WARN, "fail to init meta iter", K(ret), K(index_builder_)); STORAGE_LOG(WARN, "fail to init meta iter", K(ret), K(index_builder_));
} else if (OB_FAIL(check_need_rebuild(end_scn, macro_id_array, iter, multiplexed_macro_block_count))) { } else if (OB_FAIL(check_need_rebuild(merge_param, macro_id_array, iter, multiplexed_macro_block_count))) {
STORAGE_LOG(WARN, "failed to check need rebuild", K(ret)); STORAGE_LOG(WARN, "failed to check need rebuild", K(ret));
} else if (macro_id_array.count() != 0) { } else if (macro_id_array.count() != 0) {
iter.reuse(); iter.reuse();
@ -208,8 +208,8 @@ int ObSSTableBuilder::build_sstable_merge_res(
int ObSSTableBuilder::open_macro_writer() int ObSSTableBuilder::open_macro_writer()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
blocksstable::ObMacroDataSeq macro_start_seq(0); blocksstable::ObMacroDataSeq macro_start_seq(0);
data_store_desc_.get_desc().sstable_index_builder_ = &rebuild_index_builder_; data_store_desc_.get_desc().sstable_index_builder_ = &rebuild_index_builder_;
macro_start_seq.set_rebuild_merge_type(); macro_start_seq.set_rebuild_merge_type();
@ -220,7 +220,22 @@ int ObSSTableBuilder::open_macro_writer()
return ret; return ret;
} }
int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn, int ObSSTableBuilder::pre_check_rebuild(const ObStaticMergeParam &merge_param, bool &need_check_rebuild)
{
int ret = OB_SUCCESS;
need_check_rebuild = true;
const int64_t data_version = data_store_desc_.get_desc().get_major_working_cluster_version();
if (data_version < DATA_VERSION_4_3_0_0) {
need_check_rebuild = false;
} else if (data_version >= DATA_VERSION_4_3_2_0) {
if (merge_param.concurrent_cnt_ <= 1) {
need_check_rebuild = false;
}
}
return ret;
}
int ObSSTableBuilder::check_need_rebuild(const ObStaticMergeParam &merge_param,
ObIArray<blocksstable::MacroBlockId> &macro_id_array, ObIArray<blocksstable::MacroBlockId> &macro_id_array,
MetaIter &iter, MetaIter &iter,
int64_t &multiplexed_macro_block_count) int64_t &multiplexed_macro_block_count)
@ -228,15 +243,17 @@ int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn,
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
macro_id_array.reset(); macro_id_array.reset();
multiplexed_macro_block_count = 0; multiplexed_macro_block_count = 0;
const int64_t snapshot_version = end_scn.get_val_for_tx(); const int64_t snapshot_version = merge_param.scn_range_.end_scn_.get_val_for_tx();
if (data_store_desc_.get_desc().get_major_working_cluster_version() < DATA_VERSION_4_3_0_0) { const blocksstable::ObDataMacroBlockMeta *macro_meta;
} else { blocksstable::MacroBlockId last_macro_id;
const blocksstable::ObDataMacroBlockMeta *macro_meta; int64_t last_macro_block_sum = 0;
blocksstable::MacroBlockId last_macro_id; int64_t reduce_macro_block_cnt = 0;
int64_t last_macro_block_sum = 0; bool last_macro_is_first = false;
int64_t reduce_macro_block_cnt = 0; bool need_check_rebuild = true;
bool last_macro_is_first = false;
if (OB_FAIL(pre_check_rebuild(merge_param, need_check_rebuild))) {
STORAGE_LOG(WARN, "Fail to pre check need rebuild", K(ret));
} else if (need_check_rebuild) {
while (OB_SUCC(ret) && OB_SUCC(iter.get_next_macro_block(macro_meta))) { while (OB_SUCC(ret) && OB_SUCC(iter.get_next_macro_block(macro_meta))) {
if (OB_ISNULL(macro_meta)) { if (OB_ISNULL(macro_meta)) {
ret = OB_ERR_UNEXPECTED; ret = OB_ERR_UNEXPECTED;
@ -252,7 +269,7 @@ int ObSSTableBuilder::check_need_rebuild(const share::SCN end_scn,
last_macro_is_first = true; last_macro_is_first = true;
last_macro_block_sum = macro_block_sum; last_macro_block_sum = macro_block_sum;
multiplexed_macro_block_count = snapshot_version != macro_meta->val_.snapshot_version_ ? multiplexed_macro_block_count = snapshot_version != macro_meta->val_.snapshot_version_ ?
multiplexed_macro_block_count + 1 : multiplexed_macro_block_count; multiplexed_macro_block_count + 1 : multiplexed_macro_block_count;
} else { } else {
if (last_macro_is_first && OB_FAIL(macro_id_array.push_back(last_macro_id))) { if (last_macro_is_first && OB_FAIL(macro_id_array.push_back(last_macro_id))) {
STORAGE_LOG(WARN, "failed to push back macro id", K(ret), K(last_macro_id)); STORAGE_LOG(WARN, "failed to push back macro id", K(ret), K(last_macro_id));

View File

@ -26,6 +26,7 @@ class MacroBlockId;
} }
namespace compaction namespace compaction
{ {
struct ObStaticMergeParam;
class ObSSTableRebuildMicroBlockIter final class ObSSTableRebuildMicroBlockIter final
{ {
@ -80,7 +81,7 @@ public:
void reset(); void reset();
int set_index_read_info(const ObITableReadInfo *read_info); int set_index_read_info(const ObITableReadInfo *read_info);
int prepare_index_builder(); int prepare_index_builder();
int build_sstable_merge_res(const share::SCN end_scn, ObSSTableMergeInfo &sstable_merge_info_, blocksstable::ObSSTableMergeRes &res); int build_sstable_merge_res(const ObStaticMergeParam &merge_param, ObSSTableMergeInfo &sstable_merge_info_, blocksstable::ObSSTableMergeRes &res);
int build_reused_small_sst_merge_res( int build_reused_small_sst_merge_res(
const int64_t macro_read_size, const int64_t macro_read_size,
const int64_t macro_offset, const int64_t macro_offset,
@ -100,10 +101,11 @@ private:
const blocksstable::ObDataMacroBlockMeta &curr_macro_meta, const blocksstable::ObDataMacroBlockMeta &curr_macro_meta,
bool &need_merge); bool &need_merge);
int rewrite_macro_block(ObSSTableRebuildMicroBlockIter &micro_iter); int rewrite_macro_block(ObSSTableRebuildMicroBlockIter &micro_iter);
int check_need_rebuild(const share::SCN end_scn, int check_need_rebuild(const ObStaticMergeParam &merge_param,
ObIArray<blocksstable::MacroBlockId> &macro_id_array, ObIArray<blocksstable::MacroBlockId> &macro_id_array,
MetaIter &iter, MetaIter &iter,
int64_t &multiplexed_macro_block_count); int64_t &multiplexed_macro_block_count);
int pre_check_rebuild(const ObStaticMergeParam &merge_param, bool &need_check_rebuild);
bool check_macro_block_could_merge(const blocksstable::ObDataMacroBlockMeta &macro_meta) const bool check_macro_block_could_merge(const blocksstable::ObDataMacroBlockMeta &macro_meta) const
{ {
return data_store_desc_.get_desc().get_row_store_type() == macro_meta.val_.row_store_type_ return data_store_desc_.get_desc().get_row_store_type() == macro_meta.val_.row_store_type_

View File

@ -231,7 +231,7 @@ int ObTabletMergeInfo::create_sstable(
SMART_VARS_2((ObSSTableMergeRes, res), (ObTabletCreateSSTableParam, param)) { SMART_VARS_2((ObSSTableMergeRes, res), (ObTabletCreateSSTableParam, param)) {
if (!is_reused_small_sst if (!is_reused_small_sst
&& OB_FAIL(sstable_builder_.build_sstable_merge_res(ctx.static_param_.scn_range_.end_scn_, sstable_merge_info_, res))) { && OB_FAIL(sstable_builder_.build_sstable_merge_res(ctx.static_param_, sstable_merge_info_, res))) {
LOG_WARN("fail to close index builder", K(ret), KPC(sstable), "is_small_sst", sstable->is_small_sstable()); LOG_WARN("fail to close index builder", K(ret), KPC(sstable), "is_small_sst", sstable->is_small_sstable());
CTX_SET_DIAGNOSE_LOCATION(ctx); CTX_SET_DIAGNOSE_LOCATION(ctx);
} else if (is_reused_small_sst && OB_FAIL(sstable_builder_.build_reused_small_sst_merge_res(sstable->get_macro_read_size(), } else if (is_reused_small_sst && OB_FAIL(sstable_builder_.build_reused_small_sst_merge_res(sstable->get_macro_read_size(),