optimaze macro merge check
This commit is contained in:
parent
530427e05c
commit
f13943fc42
@ -191,9 +191,10 @@ private:
|
||||
int flush_bf_to_cache(ObMacroBloomFilterCacheWriter &bf_cache_writer, const int32_t row_count);
|
||||
void dump_micro_block(ObIMicroBlockWriter µ_writer);
|
||||
void dump_macro_block(ObMacroBlock ¯o_block);
|
||||
public:
|
||||
static const int64_t DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD = 30;
|
||||
private:
|
||||
static const int64_t DEFAULT_MACRO_BLOCK_COUNT = 128;
|
||||
static const int64_t DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD = 30;
|
||||
typedef common::ObSEArray<MacroBlockId, DEFAULT_MACRO_BLOCK_COUNT> MacroBlockList;
|
||||
|
||||
protected:
|
||||
|
@ -47,6 +47,7 @@ ObPartitionMerger::ObPartitionMerger()
|
||||
minimum_iters_(DEFAULT_ITER_ARRAY_SIZE, ModulePageAllocator(allocator_)),
|
||||
base_iter_(nullptr),
|
||||
task_idx_(0),
|
||||
check_macro_need_merge_(false),
|
||||
is_inited_(false)
|
||||
{
|
||||
}
|
||||
@ -59,6 +60,7 @@ ObPartitionMerger::~ObPartitionMerger()
|
||||
void ObPartitionMerger::reset()
|
||||
{
|
||||
is_inited_ = false;
|
||||
check_macro_need_merge_ = false;
|
||||
task_idx_ = 0;
|
||||
base_iter_ = nullptr;
|
||||
minimum_iters_.reset();
|
||||
@ -324,10 +326,11 @@ int ObPartitionMerger::merge_macro_block_iter(MERGE_ITER_ARRAY &minimum_iters, i
|
||||
int ObPartitionMerger::try_rewrite_macro_block(const ObMacroBlockDesc ¯o_desc, bool &rewrite)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
rewrite = false;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
STORAGE_LOG(WARN, "ObPartitionMerger is not inited", K(ret), K(*this));
|
||||
} else if (OB_FAIL(macro_writer_->check_data_macro_block_need_merge(macro_desc, rewrite))) {
|
||||
} else if (check_macro_need_merge_ && OB_FAIL(macro_writer_->check_data_macro_block_need_merge(macro_desc, rewrite))) {
|
||||
STORAGE_LOG(WARN, "Failed to check data macro block need merge", K(ret), K(macro_desc));
|
||||
}
|
||||
|
||||
@ -378,6 +381,85 @@ int ObPartitionMerger::prepare_merge_partition(ObMergeParameter &merge_param,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionMerger::get_macro_block_count_to_rewrite(const ObMergeParameter &merge_param,
|
||||
int64_t &need_rewrite_block_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
need_rewrite_block_cnt = 0;
|
||||
check_macro_need_merge_ = true;
|
||||
if (merge_ctx_->is_full_merge_ || merge_ctx_->tables_handle_.get_count() == 0) {
|
||||
// minor merge and full merge no need to calculate rewrite block cnt
|
||||
} else if (!merge_ctx_->tables_handle_.get_tables().at(0)->is_sstable()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "Unexpected first table for major merge", K(ret), K(merge_ctx_->tables_handle_));
|
||||
} else {
|
||||
ObSSTable *first_sstable = nullptr;
|
||||
int64_t rewrite_macro_cnt = 0, reduce_macro_cnt = 0, rewrite_block_cnt_for_progressive = 0;
|
||||
bool last_is_small_data_macro = false;
|
||||
const int64_t progressive_merge_num = merge_ctx_->progressive_merge_num_;
|
||||
const bool need_calc_progressive_merge = is_major_merge_type(merge_param.merge_type_) && merge_ctx_->progressive_merge_step_ < progressive_merge_num;
|
||||
const bool need_check_macro_merge = !is_major_merge_type(merge_param.merge_type_) || data_store_desc_.major_working_cluster_version_ >= DATA_VERSION_4_1_0_0;
|
||||
if (OB_ISNULL(first_sstable = static_cast<ObSSTable *>(merge_ctx_->tables_handle_.get_tables().at(0)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected null first sstable", K(ret), K(merge_ctx_->tables_handle_));
|
||||
} else if (need_calc_progressive_merge || need_check_macro_merge) {
|
||||
ObSSTableSecMetaIterator *sec_meta_iter = nullptr;
|
||||
ObDataMacroBlockMeta macro_meta;
|
||||
if (OB_FAIL(first_sstable->scan_secondary_meta(
|
||||
allocator_,
|
||||
merge_param.merge_range_,
|
||||
merge_ctx_->tablet_handle_.get_obj()->get_index_read_info(),
|
||||
blocksstable::DATA_BLOCK_META,
|
||||
sec_meta_iter))) {
|
||||
LOG_WARN("Fail to scan secondary meta", K(ret), K(merge_param.merge_range_));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
STORAGE_LOG(WARN, "Failed to get next macro block", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (macro_meta.val_.progressive_merge_round_ < merge_ctx_->progressive_merge_round_) {
|
||||
++rewrite_block_cnt_for_progressive;
|
||||
}
|
||||
if (macro_meta.val_.data_zsize_ < data_store_desc_.macro_block_size_ *
|
||||
ObMacroBlockWriter::DEFAULT_MACRO_BLOCK_REWRTIE_THRESHOLD / 100) {
|
||||
rewrite_macro_cnt++;
|
||||
if (last_is_small_data_macro) {
|
||||
reduce_macro_cnt++;
|
||||
}
|
||||
last_is_small_data_macro = true;
|
||||
} else {
|
||||
last_is_small_data_macro = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(sec_meta_iter)) {
|
||||
sec_meta_iter->~ObSSTableSecMetaIterator();
|
||||
allocator_.free(sec_meta_iter);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (need_calc_progressive_merge) {
|
||||
need_rewrite_block_cnt = std::max(rewrite_block_cnt_for_progressive / (progressive_merge_num -
|
||||
merge_ctx_->progressive_merge_step_), 1L);
|
||||
STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", merge_ctx_->param_.tablet_id_,
|
||||
K(need_rewrite_block_cnt), K(merge_ctx_->progressive_merge_step_),
|
||||
K(merge_ctx_->progressive_merge_num_), K(merge_ctx_->progressive_merge_round_));
|
||||
}
|
||||
if (need_check_macro_merge) {
|
||||
check_macro_need_merge_ = rewrite_macro_cnt <= (reduce_macro_cnt * 2);
|
||||
STORAGE_LOG(INFO, "finish macro block need merge check", K(check_macro_need_merge_), K(rewrite_macro_cnt), K(reduce_macro_cnt));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObPartitionMerger::set_base_iter(const MERGE_ITER_ARRAY &minimum_iters)
|
||||
{
|
||||
int64_t count = minimum_iters.count();
|
||||
@ -523,7 +605,7 @@ int ObPartitionMajorMerger::merge_partition(ObTabletMergeCtx &ctx, const int64_t
|
||||
if (merge_helper.is_iter_end()) {
|
||||
ret = OB_ITER_END;
|
||||
} else if (is_major_merge_type(merge_param.merge_type_)
|
||||
&& OB_FAIL(get_macro_block_count_to_rewrite(merge_param.merge_range_, need_rewrite_block_cnt_))) {
|
||||
&& OB_FAIL(get_macro_block_count_to_rewrite(merge_param, need_rewrite_block_cnt_))) {
|
||||
STORAGE_LOG(WARN, "Failed to compute the count of macro block to rewrite", K(ret));
|
||||
} else if (OB_FAIL(merge_helper.has_incremental_data(has_incremental_data))) {
|
||||
STORAGE_LOG(WARN, "Failed to check has_incremental_data", K(ret), K(merge_helper));
|
||||
@ -601,62 +683,6 @@ int ObPartitionMajorMerger::merge_partition(ObTabletMergeCtx &ctx, const int64_t
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionMajorMerger::get_macro_block_count_to_rewrite(const ObDatumRange &merge_range,
|
||||
int64_t &need_rewrite_block_cnt)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
need_rewrite_block_cnt = 0;
|
||||
if (merge_ctx_->is_full_merge_ || merge_ctx_->tables_handle_.get_count() == 0) {
|
||||
// minor merge and full merge no need to calculate rewrite block cnt
|
||||
} else if (!merge_ctx_->tables_handle_.get_tables().at(0)->is_sstable()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(ERROR, "Unexpected first table for major merge", K(ret), K(merge_ctx_->tables_handle_));
|
||||
} else {
|
||||
ObSSTable *first_sstable = nullptr;
|
||||
const int64_t progressive_merge_num = merge_ctx_->progressive_merge_num_;
|
||||
if (OB_ISNULL(first_sstable = static_cast<ObSSTable *>(merge_ctx_->tables_handle_.get_tables().at(0)))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected null first sstable", K(ret), K(merge_ctx_->tables_handle_));
|
||||
} else if (merge_ctx_->progressive_merge_step_ < progressive_merge_num) {
|
||||
ObSSTableSecMetaIterator *sec_meta_iter = nullptr;
|
||||
ObDataMacroBlockMeta macro_meta;
|
||||
if (OB_FAIL(first_sstable->scan_secondary_meta(
|
||||
allocator_,
|
||||
merge_range,
|
||||
merge_ctx_->tablet_handle_.get_obj()->get_index_read_info(),
|
||||
blocksstable::DATA_BLOCK_META,
|
||||
sec_meta_iter))) {
|
||||
LOG_WARN("Fail to scan secondary meta", K(ret), K(merge_range));
|
||||
}
|
||||
while (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(sec_meta_iter->get_next(macro_meta))) {
|
||||
if (OB_ITER_END != ret) {
|
||||
STORAGE_LOG(WARN, "Failed to get next macro block", K(ret));
|
||||
} else {
|
||||
ret = OB_SUCCESS;
|
||||
break;
|
||||
}
|
||||
} else if (macro_meta.val_.progressive_merge_round_ < merge_ctx_->progressive_merge_round_) {
|
||||
++need_rewrite_block_cnt;
|
||||
}
|
||||
}
|
||||
if (OB_NOT_NULL(sec_meta_iter)) {
|
||||
sec_meta_iter->~ObSSTableSecMetaIterator();
|
||||
allocator_.free(sec_meta_iter);
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
need_rewrite_block_cnt = std::max(need_rewrite_block_cnt / (progressive_merge_num -
|
||||
merge_ctx_->progressive_merge_step_), 1L);
|
||||
STORAGE_LOG(INFO, "There are some macro block need rewrite", "tablet_id", merge_ctx_->param_.tablet_id_,
|
||||
K(need_rewrite_block_cnt), K(merge_ctx_->progressive_merge_step_),
|
||||
K(merge_ctx_->progressive_merge_num_), K(merge_ctx_->progressive_merge_round_));
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObPartitionMajorMerger::merge_same_rowkey_iters(MERGE_ITER_ARRAY &merge_iters)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1071,11 +1097,15 @@ int ObPartitionMinorMerger::merge_partition(ObTabletMergeCtx &ctx, const int64_t
|
||||
int ret = OB_SUCCESS;
|
||||
ObPartitionMinorMergeHelper merge_helper;
|
||||
ObMergeParameter merge_param;
|
||||
int64_t need_rewrite_block_cnt;
|
||||
|
||||
if (OB_FAIL(open(ctx, idx))) {
|
||||
STORAGE_LOG(WARN, "Failed to open partition minor merge fuse", K(ret));
|
||||
} else if (OB_FAIL(prepare_merge_partition(merge_param, merge_helper))) {
|
||||
STORAGE_LOG(WARN, "Failed to prepare merge partition", K(ret));
|
||||
} else if (!is_mini_merge(merge_param.merge_type_)
|
||||
&& OB_FAIL(get_macro_block_count_to_rewrite(merge_param, need_rewrite_block_cnt))) {
|
||||
STORAGE_LOG(WARN, "failed to get macro count rewrite", K(ret), K(merge_param));
|
||||
} else {
|
||||
int64_t reuse_row_cnt = 0;
|
||||
MERGE_ITER_ARRAY rowkey_minimum_iters;
|
||||
|
@ -65,6 +65,8 @@ protected:
|
||||
int open_macro_writer(ObMergeParameter &merge_param);
|
||||
int prepare_merge_partition(ObMergeParameter &merge_param,
|
||||
ObPartitionMergeHelper &merge_helper);
|
||||
int get_macro_block_count_to_rewrite(const ObMergeParameter &merge_param,
|
||||
int64_t &need_rewrite_block_cnt);
|
||||
int check_row_columns(const blocksstable::ObDatumRow &row);
|
||||
int try_filter_row(const blocksstable::ObDatumRow &row, ObICompactionFilter::ObFilterRet &filter_ret);
|
||||
int get_base_iter_curr_macro_block(const blocksstable::ObMacroBlockDesc *¯o_desc);
|
||||
@ -82,6 +84,7 @@ protected:
|
||||
MERGE_ITER_ARRAY minimum_iters_;
|
||||
ObPartitionMergeIter *base_iter_;
|
||||
int64_t task_idx_;
|
||||
bool check_macro_need_merge_;
|
||||
bool is_inited_;
|
||||
};
|
||||
|
||||
@ -102,8 +105,6 @@ protected:
|
||||
private:
|
||||
int merge_micro_block_iter(ObPartitionMergeIter &iter, int64_t &reuse_row_cnt);
|
||||
int reuse_base_sstable(ObPartitionMajorMergeHelper &merge_helper);
|
||||
int get_macro_block_count_to_rewrite(const blocksstable::ObDatumRange &merge_range,
|
||||
int64_t &need_rewrite_block_cnt);
|
||||
private:
|
||||
int64_t rewrite_block_cnt_;
|
||||
int64_t need_rewrite_block_cnt_;
|
||||
|
Loading…
x
Reference in New Issue
Block a user