reuse micro block for co merge with direct load data && fix myseltests
This commit is contained in:
parent
e197d54ef2
commit
ad2cb0ac0e
@ -493,7 +493,8 @@ int ObCOTabletMergeCtx::create_sstables(const uint32_t start_cg_idx, const uint3
|
||||
} else {
|
||||
merged_sstable_array_[i] = table_handle.get_table();
|
||||
exist_cg_tables_cnt++;
|
||||
LOG_TRACE("success to create sstable", K(ret), K(i), K(table_handle), "merged_cg_tables_count", count);
|
||||
const ObSSTable *new_sstable = static_cast<ObSSTable *>(table_handle.get_table());
|
||||
FLOG_INFO("success to create sstable", KPC(new_sstable), KPC(cg_schema_ptr), "cg_idx", i);
|
||||
}
|
||||
} // for
|
||||
if (OB_FAIL(ret)) {
|
||||
|
@ -581,11 +581,11 @@ int ObCOMergeRowWriter::process(
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObMacroBlockOp block_op;
|
||||
if (OB_NOT_NULL(progressive_merge_helper_) && progressive_merge_helper_->is_valid()) {
|
||||
if (OB_FAIL(progressive_merge_helper_->check_macro_block_op(macro_desc, block_op))) {
|
||||
if (OB_ISNULL(progressive_merge_helper_) || !progressive_merge_helper_->is_valid()) {
|
||||
// do nothing
|
||||
} else if (OB_FAIL(progressive_merge_helper_->check_macro_block_op(macro_desc, block_op))) {
|
||||
STORAGE_LOG(WARN, "failed to check macro operation", K(ret), K(macro_desc));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (block_op.is_rewrite()) {
|
||||
@ -601,7 +601,7 @@ int ObCOMergeRowWriter::process(
|
||||
} else if (OB_FAIL(write_helper_.append_macro_block(macro_desc, micro_block_data))) {
|
||||
STORAGE_LOG(WARN, "failed to append macro block", K(ret), K(macro_desc));
|
||||
}
|
||||
|
||||
STORAGE_LOG(DEBUG, "process micro data", K(ret), K(macro_desc), K(block_op), K(micro_block_data), KPC(this));
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
@ -375,7 +375,11 @@ int ObCOMerger::close()
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObCOMerger::build_mergelog(const blocksstable::ObDatumRow &row, ObMergeLog &merge_log, bool &need_replay, bool &row_store_iter_need_move)
|
||||
int ObCOMerger::build_mergelog(
|
||||
const blocksstable::ObDatumRow &row,
|
||||
ObMergeLog &merge_log,
|
||||
bool &need_replay,
|
||||
bool &row_store_iter_need_move)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t cmp_ret = 0;
|
||||
|
@ -335,8 +335,7 @@ int ObPartitionMergeIter::check_merge_range_cross(ObDatumRange &data_range, bool
|
||||
}
|
||||
|
||||
// safe to modify range of curr_macro_block with overwriting ptr only
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(merge_range_.get_start_key().compare(data_range.get_start_key(),
|
||||
if (FAILEDx(merge_range_.get_start_key().compare(data_range.get_start_key(),
|
||||
*datum_utils,
|
||||
cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Failed to compare start key", K(ret), K_(merge_range), K(data_range));
|
||||
@ -344,8 +343,7 @@ int ObPartitionMergeIter::check_merge_range_cross(ObDatumRange &data_range, bool
|
||||
data_range.start_key_ = merge_range_.get_start_key();
|
||||
range_cross = true;
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(merge_range_.get_end_key().compare(data_range.get_end_key(),
|
||||
if (FAILEDx(merge_range_.get_end_key().compare(data_range.get_end_key(),
|
||||
*datum_utils,
|
||||
cmp_ret))) {
|
||||
STORAGE_LOG(WARN, "Failed to compare end key", K(ret), K_(merge_range), K(data_range));
|
||||
@ -898,7 +896,8 @@ ObPartitionMicroMergeIter::ObPartitionMicroMergeIter(common::ObIAllocator &alloc
|
||||
curr_micro_block_(nullptr),
|
||||
micro_block_opened_(false),
|
||||
macro_reader_(),
|
||||
need_reuse_micro_block_(true)
|
||||
need_reuse_micro_block_(true),
|
||||
need_check_schema_version_(false)
|
||||
{
|
||||
}
|
||||
|
||||
@ -917,6 +916,7 @@ void ObPartitionMicroMergeIter::reset()
|
||||
curr_micro_block_ = nullptr;
|
||||
micro_block_opened_ = false;
|
||||
need_reuse_micro_block_ = true;
|
||||
need_check_schema_version_ = false;
|
||||
ObPartitionMacroMergeIter::reset();
|
||||
}
|
||||
|
||||
@ -963,15 +963,17 @@ int ObPartitionMicroMergeIter::inner_init(const ObMergeParameter &merge_param)
|
||||
} else {
|
||||
curr_micro_block_ = nullptr;
|
||||
micro_block_opened_ = false;
|
||||
need_check_schema_version_ = merge_param.static_param_.data_version_ < DATA_VERSION_4_3_3_0;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
// check before open each macro block
|
||||
void ObPartitionMicroMergeIter::check_need_reuse_micro_block()
|
||||
{
|
||||
if (curr_block_desc_.schema_version_ <= 0 || curr_block_desc_.schema_version_ != schema_version_) {
|
||||
if (curr_block_desc_.schema_version_ <= 0) {
|
||||
need_reuse_micro_block_ = false;
|
||||
} else if (curr_block_desc_.schema_version_ != schema_version_ && need_check_schema_version_) {
|
||||
need_reuse_micro_block_ = false;
|
||||
} else {
|
||||
need_reuse_micro_block_ = true;
|
||||
|
@ -276,8 +276,8 @@ public:
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int check_row_changed(const blocksstable::ObDatumRow &row, const int64_t row_id, bool &is_changed) override;
|
||||
INHERIT_TO_STRING_KV("ObPartitionMicroMergeIter", ObPartitionMacroMergeIter, K_(micro_block_opened),
|
||||
K_(need_reuse_micro_block), KPC(curr_micro_block_), KP_(micro_row_scanner));
|
||||
INHERIT_TO_STRING_KV("ObPartitionMicroMergeIter", ObPartitionMacroMergeIter, K_(micro_block_opened), K_(need_reuse_micro_block),
|
||||
K_(need_check_schema_version), KPC(curr_micro_block_), KP_(micro_row_scanner));
|
||||
private:
|
||||
virtual int inner_init(const ObMergeParameter &merge_param) override;
|
||||
virtual bool inner_check(const ObMergeParameter &merge_param) override;
|
||||
@ -291,6 +291,7 @@ private:
|
||||
bool micro_block_opened_;
|
||||
blocksstable::ObMacroBlockReader macro_reader_;
|
||||
bool need_reuse_micro_block_;
|
||||
bool need_check_schema_version_; // used for reuse micro block
|
||||
};
|
||||
|
||||
class ObPartitionMinorRowMergeIter : public ObPartitionMergeIter
|
||||
|
@ -20,6 +20,8 @@ using namespace blocksstable;
|
||||
using namespace common;
|
||||
namespace compaction
|
||||
{
|
||||
ERRSIM_POINT_DEF(EN_CO_MERGE_REUSE_MICRO);
|
||||
|
||||
ObProgressiveMergeMgr::ObProgressiveMergeMgr()
|
||||
: progressive_merge_round_(0),
|
||||
progressive_merge_num_(0),
|
||||
@ -141,7 +143,6 @@ int ObProgressiveMergeHelper::init(
|
||||
mgr_ = mgr; // init mgr first
|
||||
int64_t rewrite_macro_cnt = 0, reduce_macro_cnt = 0, rewrite_block_cnt_for_progressive = 0;
|
||||
|
||||
const int64_t compare_progressive_merge_round = get_compare_progressive_round();
|
||||
if (OB_FAIL(collect_macro_info(sstable, merge_param, rewrite_macro_cnt, reduce_macro_cnt, rewrite_block_cnt_for_progressive))) {
|
||||
LOG_WARN("Fail to scan secondary meta", K(ret), K(merge_param));
|
||||
} else if (need_calc_progressive_merge()) {
|
||||
@ -273,18 +274,23 @@ int ObProgressiveMergeHelper::check_macro_block_op(const ObMacroBlockDesc ¯o
|
||||
rewrite_block_cnt_++;
|
||||
}
|
||||
}
|
||||
if (block_op.is_none()) {
|
||||
if (!check_macro_need_merge_) {
|
||||
} else if (macro_desc.macro_meta_->val_.data_zsize_ < REWRITE_MACRO_SIZE_THRESHOLD) {
|
||||
// before 432 we need rewrite this macro block
|
||||
if (data_version_ < DATA_VERSION_4_3_2_0) {
|
||||
if (block_op.is_none() && check_macro_need_merge_) {
|
||||
bool need_set_block_op = macro_desc.macro_meta_->val_.data_zsize_ < REWRITE_MACRO_SIZE_THRESHOLD;
|
||||
#ifdef ERRSIM
|
||||
if (OB_UNLIKELY(EN_CO_MERGE_REUSE_MICRO)) {
|
||||
ret = OB_SUCCESS;
|
||||
need_set_block_op = true;
|
||||
FLOG_INFO("ERRSIM EN_CO_MERGE_REUSE_MICRO", KR(ret), K(need_set_block_op));
|
||||
}
|
||||
#endif
|
||||
if (!need_set_block_op) {
|
||||
} else if (data_version_ < DATA_VERSION_4_3_2_0) {
|
||||
block_op.set_rewrite();
|
||||
} else {
|
||||
block_op.set_reorg();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user