From 430086798f3f5b445ec78841451324e9a5b6828e Mon Sep 17 00:00:00 2001 From: ZenoWang Date: Tue, 21 May 2024 05:26:58 +0000 Subject: [PATCH] [CP] [FIX] make split_range feel range closed or opened --- mittest/mtlenv/test_tx_data_table.cpp | 4 +- src/storage/access/ob_table_estimator.cpp | 16 +++--- .../ob_partition_parallel_merge_ctx.cpp | 15 ++--- src/storage/memtable/ob_memtable.cpp | 57 +++++++++++++------ src/storage/memtable/ob_memtable.h | 7 ++- src/storage/memtable/ob_memtable_interface.h | 18 +++--- src/storage/ob_partition_range_spliter.cpp | 33 +++-------- src/storage/tx_table/ob_tx_data_memtable.cpp | 6 +- src/storage/tx_table/ob_tx_data_memtable.h | 3 +- 9 files changed, 80 insertions(+), 79 deletions(-) diff --git a/mittest/mtlenv/test_tx_data_table.cpp b/mittest/mtlenv/test_tx_data_table.cpp index 18fe919bd..ddada3ce6 100644 --- a/mittest/mtlenv/test_tx_data_table.cpp +++ b/mittest/mtlenv/test_tx_data_table.cpp @@ -412,7 +412,9 @@ void TestTxDataTable::do_basic_test() int64_t inserted_cnt_after_pre_process = freezing_memtable->get_tx_data_count(); ASSERT_EQ(inserted_cnt_before_pre_process + 1, inserted_cnt_after_pre_process); - ASSERT_EQ(OB_SUCCESS, freezing_memtable->get_split_ranges(nullptr, nullptr, range_cnt, range_array)); + ObStoreRange input_range; + input_range.set_whole_range(); + ASSERT_EQ(OB_SUCCESS, freezing_memtable->get_split_ranges(input_range, range_cnt, range_array)); int64_t pre_range_end_key = 0; for (int i = 0; i < range_cnt; i++) { auto &range = range_array[i]; diff --git a/src/storage/access/ob_table_estimator.cpp b/src/storage/access/ob_table_estimator.cpp index 012f228a7..df13d24a9 100644 --- a/src/storage/access/ob_table_estimator.cpp +++ b/src/storage/access/ob_table_estimator.cpp @@ -159,14 +159,14 @@ int ObTableEstimator::estimate_multi_scan_row_count( ObPartitionEst sub_range_cost; const static int64_t sub_range_cnt = 3; ObSEArray store_ranges; - if (OB_FAIL((static_cast(current_table))->get_split_ranges( - &range.get_start_key().get_store_rowkey(), - &range.get_end_key().get_store_rowkey(), - sub_range_cnt, - store_ranges))) { - if (OB_ENTRY_NOT_EXIST != ret) { - LOG_WARN("Failed to split ranges", K(ret), K(tmp_cost)); - } + ObStoreRange input_range; + input_range.set_start_key(range.get_start_key().get_store_rowkey()); + input_range.set_end_key(range.get_end_key().get_store_rowkey()); + range.is_left_closed() ? input_range.set_left_closed() : input_range.set_left_open(); + range.is_right_closed() ? input_range.set_right_closed() : input_range.set_right_open(); + if (OB_FAIL((static_cast(current_table)) + ->get_split_ranges(input_range, sub_range_cnt, store_ranges))) { + LOG_WARN("Failed to split ranges", K(ret), K(tmp_cost)); } else if (store_ranges.count() > 1) { LOG_TRACE("estimated logical row count may be not right, split range and do estimating again", K(tmp_cost), K(store_ranges)); common::ObArenaAllocator allocator("OB_STORAGE_EST", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); diff --git a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp index c8dc8b9c4..be6fcd601 100644 --- a/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp +++ b/src/storage/compaction/ob_partition_parallel_merge_ctx.cpp @@ -287,22 +287,23 @@ int ObParallelMergeCtx::init_parallel_mini_merge(compaction::ObBasicTabletMergeC ObArray store_ranges; store_ranges.set_attr(lib::ObMemAttr(MTL_ID(), "TmpMiniRanges", ObCtxIds::MERGE_NORMAL_CTX_ID)); + ObStoreRange input_range; + input_range.set_whole_range(); if (concurrent_cnt_ <= 1) { if (OB_FAIL(init_serial_merge())) { STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret)); } - } else if (OB_FAIL(memtable->get_split_ranges(nullptr, nullptr, concurrent_cnt_, store_ranges))) { - if (OB_ENTRY_NOT_EXIST == ret) { + } else if (OB_FAIL(memtable->get_split_ranges(input_range, concurrent_cnt_, store_ranges))) { + STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret)); + } else if (OB_UNLIKELY(store_ranges.count() != concurrent_cnt_)) { + if (1 == store_ranges.count()) { if (OB_FAIL(init_serial_merge())) { STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret)); } } else { - STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret)); + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "Unexpected range array and concurrent_cnt", K(ret), K_(concurrent_cnt), K(store_ranges)); } - } else if (OB_UNLIKELY(store_ranges.count() != concurrent_cnt_)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "Unexpected range array and concurrent_cnt", K(ret), K_(concurrent_cnt), - K(store_ranges)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < store_ranges.count(); i++) { ObDatumRange datum_range; diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index b9cd5d55a..d7a368d49 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -321,7 +321,7 @@ int ObMemtable::safe_to_destroy(bool &is_safe) if (!is_safe && ret == OB_STATE_NOT_MATCH) { ret = OB_SUCCESS; bool is_done = false; - share::LSN end_lsn; + LSN end_lsn; if (OB_FAIL(MTL(logservice::ObLogService*)->get_log_apply_service()-> is_apply_done(ls_handle_.get_ls()->get_ls_id(), is_done, @@ -2201,17 +2201,17 @@ int ObMemtable::estimate_phy_size(const ObStoreRowkey* start_key, const ObStoreR return ret; } -int ObMemtable::get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, const int64_t part_cnt, ObIArray &range_array) +int ObMemtable::get_split_ranges(const ObStoreRange &input_range, + const int64_t part_cnt, + ObIArray &range_array) { int ret = OB_SUCCESS; + range_array.reuse(); + const ObStoreRowkey *start_key = &input_range.get_start_key(); + const ObStoreRowkey *end_key = &input_range.get_end_key(); ObMemtableKey start_mtk; ObMemtableKey end_mtk; - if (NULL == start_key) { - start_key = &ObStoreRowkey::MIN_STORE_ROWKEY; - } - if (NULL == end_key) { - end_key = &ObStoreRowkey::MAX_STORE_ROWKEY; - } + if (part_cnt < 1) { ret = OB_INVALID_ARGUMENT; TRANS_LOG(WARN, "part cnt need be greater than 1", K(ret), K(part_cnt)); @@ -2220,6 +2220,25 @@ int ObMemtable::get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRo } else if (OB_FAIL(query_engine_.split_range(&start_mtk, &end_mtk, part_cnt, range_array))) { TRANS_LOG(WARN, "estimate row count fail", K(ret), K_(key)); } + + if (OB_ENTRY_NOT_EXIST == ret) { + // construct a single range if split failed + ret = OB_SUCCESS; + ObStoreRange merge_range; + merge_range.set_start_key(*start_key); + merge_range.set_end_key(*end_key); + if (OB_FAIL(range_array.push_back(merge_range))) { + STORAGE_LOG(WARN, "push back merge range to range array failed", KR(ret), K(merge_range)); + } + } + + if (OB_SUCC(ret) && !range_array.empty()) { + // set range closed or open + ObStoreRange &first_range = range_array.at(0); + ObStoreRange &last_range = range_array.at(range_array.count() - 1); + input_range.get_border_flag().inclusive_start() ? first_range.set_left_closed() : first_range.set_left_open(); + input_range.get_border_flag().inclusive_end() ? last_range.set_right_closed() : last_range.set_right_open(); + } return ret; } @@ -2269,11 +2288,14 @@ int ObMemtable::split_ranges_for_sample(const blocksstable::ObDatumRange &table_ while (!split_succ && total_split_range_count > ObMemtableRowSampleIterator::SAMPLE_MEMTABLE_RANGE_COUNT) { int tmp_ret = OB_SUCCESS; sample_memtable_ranges.reuse(); - if (OB_TMP_FAIL(try_split_range_for_sample_(table_scan_range.get_start_key().get_store_rowkey(), - table_scan_range.get_end_key().get_store_rowkey(), - total_split_range_count, - allocator, - sample_memtable_ranges))) { + ObStoreRange input_range; + input_range.set_start_key(table_scan_range.get_start_key().get_store_rowkey()); + input_range.set_end_key(table_scan_range.get_end_key().get_store_rowkey()); + input_range.is_left_open() ? input_range.set_left_open() : input_range.set_left_closed(); + input_range.is_right_open() ? input_range.set_right_open() : input_range.set_right_closed(); + + if (OB_TMP_FAIL( + try_split_range_for_sample_(input_range, total_split_range_count, allocator, sample_memtable_ranges))) { total_split_range_count = total_split_range_count / 10; TRANS_LOG(WARN, "try split range for sampling failed, shrink split range count and retry", @@ -2294,19 +2316,18 @@ int ObMemtable::split_ranges_for_sample(const blocksstable::ObDatumRange &table_ return ret; } -int64_t ObMemtable::try_split_range_for_sample_(const ObStoreRowkey &start_key, - const ObStoreRowkey &end_key, +int64_t ObMemtable::try_split_range_for_sample_(const ObStoreRange &input_range, const int64_t range_count, ObIAllocator &allocator, ObIArray &sample_memtable_ranges) { int ret = OB_SUCCESS; ObSEArray store_range_array; - if (OB_FAIL(get_split_ranges(&start_key, &end_key, range_count, store_range_array))) { + if (OB_FAIL(get_split_ranges(input_range, range_count, store_range_array))) { TRANS_LOG(WARN, "try split ranges for sample failed", KR(ret)); } else if (store_range_array.count() != range_count) { - ret = OB_ERR_UNEXPECTED; - TRANS_LOG(ERROR, "store array count is not equal with range_count", KR(ret), K(range_count), KPC(this)); + ret = OB_ENTRY_NOT_EXIST; + TRANS_LOG(INFO, "memtable row is not enough for splitting", KR(ret), K(range_count), KPC(this)); } else { const int64_t range_count_each_chosen = range_count / (ObMemtableRowSampleIterator::SAMPLE_MEMTABLE_RANGE_COUNT - 1); diff --git a/src/storage/memtable/ob_memtable.h b/src/storage/memtable/ob_memtable.h index b7845c974..5a02ae1a9 100644 --- a/src/storage/memtable/ob_memtable.h +++ b/src/storage/memtable/ob_memtable.h @@ -394,7 +394,9 @@ public: void set_contain_hotspot_row() { return ATOMIC_STORE(&contain_hotspot_row_, true); } virtual int64_t get_upper_trans_version() const override; virtual int estimate_phy_size(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, int64_t& total_bytes, int64_t& total_rows) override; - virtual int get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, const int64_t part_cnt, common::ObIArray &range_array) override; + virtual int get_split_ranges(const ObStoreRange &input_range, + const int64_t part_cnt, + ObIArray &range_array) override; int split_ranges_for_sample(const blocksstable::ObDatumRange &table_scan_range, const double sample_rate_percentage, ObIAllocator &allocator, @@ -573,8 +575,7 @@ private: const int64_t last_compact_cnt, const int64_t total_trans_node_count); bool ready_for_flush_(); - int64_t try_split_range_for_sample_(const ObStoreRowkey &start_key, - const ObStoreRowkey &end_key, + int64_t try_split_range_for_sample_(const ObStoreRange &input_range, const int64_t range_count, ObIAllocator &allocator, ObIArray &sample_memtable_ranges); diff --git a/src/storage/memtable/ob_memtable_interface.h b/src/storage/memtable/ob_memtable_interface.h index cfce5d087..94b66949c 100644 --- a/src/storage/memtable/ob_memtable_interface.h +++ b/src/storage/memtable/ob_memtable_interface.h @@ -142,22 +142,18 @@ public: total_rows = 0; return OB_SUCCESS; } - virtual int get_split_ranges(const ObStoreRowkey *start_key, - const ObStoreRowkey *end_key, + + virtual int get_split_ranges(const ObStoreRange &input_range, const int64_t part_cnt, - common::ObIArray &range_array) + ObIArray &range_array) { - UNUSEDx(start_key, end_key); + UNUSEDx(input_range); int ret = OB_SUCCESS; if (OB_UNLIKELY(part_cnt != 1)) { ret = OB_NOT_SUPPORTED; - } else { - ObStoreRange merge_range; - merge_range.set_start_key(ObStoreRowkey::MIN_STORE_ROWKEY); - merge_range.set_end_key(ObStoreRowkey::MAX_STORE_ROWKEY); - if (OB_FAIL(range_array.push_back(merge_range))) { - TRANS_LOG(ERROR, "push back to range array failed", K(ret)); - } + STORAGE_LOG(WARN, "split a single range is not supported", KR(ret), K(input_range), K(part_cnt)); + } else if (OB_FAIL(range_array.push_back(input_range))) { + STORAGE_LOG(WARN, "push back to range array failed", K(ret)); } return ret; } diff --git a/src/storage/ob_partition_range_spliter.cpp b/src/storage/ob_partition_range_spliter.cpp index dd80aaf31..92ad5fd7c 100644 --- a/src/storage/ob_partition_range_spliter.cpp +++ b/src/storage/ob_partition_range_spliter.cpp @@ -1097,37 +1097,20 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info, ObSEArray store_ranges; memtable::ObMemtable *memtable = static_cast(table); - if (OB_FAIL(memtable->get_split_ranges( - &range_info.store_range_->get_start_key(), - &range_info.store_range_->get_end_key(), - range_info.parallel_target_count_, - store_ranges))) { - if (OB_ENTRY_NOT_EXIST == ret) { - STORAGE_LOG(WARN, "Failed to get split ranges from memtable, build single range instead", K(ret)); - if (OB_FAIL(build_single_range(false/*for compaction*/, range_info, allocator, range_array))) { - STORAGE_LOG(WARN, "Failed to build single range", K(ret)); - } else { - STORAGE_LOG(DEBUG, "try to make single split range for memtable", K(range_info), K(range_array)); - } - } else { - STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret)); - } + if (OB_ISNULL(memtable)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "Unexpected null memtable", K(ret), KP(memtable), K(range_info)); + } else if (OB_FAIL(memtable->get_split_ranges( + *range_info.store_range_, range_info.parallel_target_count_, store_ranges))) { + STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret)); } else { ObStoreRange store_range; for (int64_t i = 0; OB_SUCC(ret) && i < store_ranges.count(); i++) { if (OB_FAIL(store_ranges.at(i).deep_copy(allocator, store_range))) { STORAGE_LOG(WARN, "Failed to deep copy store range", K(ret), K(store_ranges)); } else if (FALSE_IT(store_range.set_table_id(range_info.store_range_->get_table_id()))) { - } else { - if (i == 0 && range_info.store_range_->get_border_flag().inclusive_start()) { - store_range.set_left_closed(); - } - if (i == store_ranges.count() - 1 && range_info.store_range_->get_border_flag().inclusive_end()) { - store_range.set_right_closed(); - } - if (OB_FAIL(range_array.push_back(store_range))) { - STORAGE_LOG(WARN, "Failed to push back store range", K(ret), K(store_range)); - } + } else if (OB_FAIL(range_array.push_back(store_range))) { + STORAGE_LOG(WARN, "Failed to push back store range", K(ret), K(store_range)); } } } diff --git a/src/storage/tx_table/ob_tx_data_memtable.cpp b/src/storage/tx_table/ob_tx_data_memtable.cpp index 68cfced3f..5d0b659a7 100644 --- a/src/storage/tx_table/ob_tx_data_memtable.cpp +++ b/src/storage/tx_table/ob_tx_data_memtable.cpp @@ -659,13 +659,11 @@ int ObTxDataMemtable::estimate_phy_size(const ObStoreRowkey *start_key, return ret; } -int ObTxDataMemtable::get_split_ranges(const ObStoreRowkey *start_key, - const ObStoreRowkey *end_key, +int ObTxDataMemtable::get_split_ranges(const ObStoreRange &input_range, const int64_t part_cnt, common::ObIArray &range_array) { - UNUSED(start_key); - UNUSED(end_key); + UNUSED(input_range); int ret = OB_SUCCESS; if (!pre_process_done_) { diff --git a/src/storage/tx_table/ob_tx_data_memtable.h b/src/storage/tx_table/ob_tx_data_memtable.h index f8310b1f9..f875ec13d 100644 --- a/src/storage/tx_table/ob_tx_data_memtable.h +++ b/src/storage/tx_table/ob_tx_data_memtable.h @@ -297,8 +297,7 @@ public: /* derived from ObIMemtable */ int64_t &total_bytes, int64_t &total_rows) override; - virtual int get_split_ranges(const ObStoreRowkey *start_key, - const ObStoreRowkey *end_key, + virtual int get_split_ranges(const ObStoreRange &input_range, const int64_t part_cnt, common::ObIArray &range_array) override; // not supported