[CP] [FIX] make split_range feel range closed or opened
This commit is contained in:
parent
6c172afeae
commit
430086798f
@ -412,7 +412,9 @@ void TestTxDataTable::do_basic_test()
|
||||
int64_t inserted_cnt_after_pre_process = freezing_memtable->get_tx_data_count();
|
||||
ASSERT_EQ(inserted_cnt_before_pre_process + 1, inserted_cnt_after_pre_process);
|
||||
|
||||
ASSERT_EQ(OB_SUCCESS, freezing_memtable->get_split_ranges(nullptr, nullptr, range_cnt, range_array));
|
||||
ObStoreRange input_range;
|
||||
input_range.set_whole_range();
|
||||
ASSERT_EQ(OB_SUCCESS, freezing_memtable->get_split_ranges(input_range, range_cnt, range_array));
|
||||
int64_t pre_range_end_key = 0;
|
||||
for (int i = 0; i < range_cnt; i++) {
|
||||
auto &range = range_array[i];
|
||||
|
@ -159,14 +159,14 @@ int ObTableEstimator::estimate_multi_scan_row_count(
|
||||
ObPartitionEst sub_range_cost;
|
||||
const static int64_t sub_range_cnt = 3;
|
||||
ObSEArray<ObStoreRange, sub_range_cnt> store_ranges;
|
||||
if (OB_FAIL((static_cast<memtable::ObMemtable*>(current_table))->get_split_ranges(
|
||||
&range.get_start_key().get_store_rowkey(),
|
||||
&range.get_end_key().get_store_rowkey(),
|
||||
sub_range_cnt,
|
||||
store_ranges))) {
|
||||
if (OB_ENTRY_NOT_EXIST != ret) {
|
||||
LOG_WARN("Failed to split ranges", K(ret), K(tmp_cost));
|
||||
}
|
||||
ObStoreRange input_range;
|
||||
input_range.set_start_key(range.get_start_key().get_store_rowkey());
|
||||
input_range.set_end_key(range.get_end_key().get_store_rowkey());
|
||||
range.is_left_closed() ? input_range.set_left_closed() : input_range.set_left_open();
|
||||
range.is_right_closed() ? input_range.set_right_closed() : input_range.set_right_open();
|
||||
if (OB_FAIL((static_cast<memtable::ObMemtable *>(current_table))
|
||||
->get_split_ranges(input_range, sub_range_cnt, store_ranges))) {
|
||||
LOG_WARN("Failed to split ranges", K(ret), K(tmp_cost));
|
||||
} else if (store_ranges.count() > 1) {
|
||||
LOG_TRACE("estimated logical row count may be not right, split range and do estimating again", K(tmp_cost), K(store_ranges));
|
||||
common::ObArenaAllocator allocator("OB_STORAGE_EST", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
|
||||
|
@ -287,22 +287,23 @@ int ObParallelMergeCtx::init_parallel_mini_merge(compaction::ObBasicTabletMergeC
|
||||
ObArray<ObStoreRange> store_ranges;
|
||||
store_ranges.set_attr(lib::ObMemAttr(MTL_ID(), "TmpMiniRanges", ObCtxIds::MERGE_NORMAL_CTX_ID));
|
||||
|
||||
ObStoreRange input_range;
|
||||
input_range.set_whole_range();
|
||||
if (concurrent_cnt_ <= 1) {
|
||||
if (OB_FAIL(init_serial_merge())) {
|
||||
STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret));
|
||||
}
|
||||
} else if (OB_FAIL(memtable->get_split_ranges(nullptr, nullptr, concurrent_cnt_, store_ranges))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
} else if (OB_FAIL(memtable->get_split_ranges(input_range, concurrent_cnt_, store_ranges))) {
|
||||
STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret));
|
||||
} else if (OB_UNLIKELY(store_ranges.count() != concurrent_cnt_)) {
|
||||
if (1 == store_ranges.count()) {
|
||||
if (OB_FAIL(init_serial_merge())) {
|
||||
STORAGE_LOG(WARN, "Failed to init serialize merge", K(ret));
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret));
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected range array and concurrent_cnt", K(ret), K_(concurrent_cnt), K(store_ranges));
|
||||
}
|
||||
} else if (OB_UNLIKELY(store_ranges.count() != concurrent_cnt_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected range array and concurrent_cnt", K(ret), K_(concurrent_cnt),
|
||||
K(store_ranges));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_ranges.count(); i++) {
|
||||
ObDatumRange datum_range;
|
||||
|
@ -321,7 +321,7 @@ int ObMemtable::safe_to_destroy(bool &is_safe)
|
||||
if (!is_safe && ret == OB_STATE_NOT_MATCH) {
|
||||
ret = OB_SUCCESS;
|
||||
bool is_done = false;
|
||||
share::LSN end_lsn;
|
||||
LSN end_lsn;
|
||||
if (OB_FAIL(MTL(logservice::ObLogService*)->get_log_apply_service()->
|
||||
is_apply_done(ls_handle_.get_ls()->get_ls_id(),
|
||||
is_done,
|
||||
@ -2201,17 +2201,17 @@ int ObMemtable::estimate_phy_size(const ObStoreRowkey* start_key, const ObStoreR
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObMemtable::get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, const int64_t part_cnt, ObIArray<ObStoreRange> &range_array)
|
||||
int ObMemtable::get_split_ranges(const ObStoreRange &input_range,
|
||||
const int64_t part_cnt,
|
||||
ObIArray<ObStoreRange> &range_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
range_array.reuse();
|
||||
const ObStoreRowkey *start_key = &input_range.get_start_key();
|
||||
const ObStoreRowkey *end_key = &input_range.get_end_key();
|
||||
ObMemtableKey start_mtk;
|
||||
ObMemtableKey end_mtk;
|
||||
if (NULL == start_key) {
|
||||
start_key = &ObStoreRowkey::MIN_STORE_ROWKEY;
|
||||
}
|
||||
if (NULL == end_key) {
|
||||
end_key = &ObStoreRowkey::MAX_STORE_ROWKEY;
|
||||
}
|
||||
|
||||
if (part_cnt < 1) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
TRANS_LOG(WARN, "part cnt need be greater than 1", K(ret), K(part_cnt));
|
||||
@ -2220,6 +2220,25 @@ int ObMemtable::get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRo
|
||||
} else if (OB_FAIL(query_engine_.split_range(&start_mtk, &end_mtk, part_cnt, range_array))) {
|
||||
TRANS_LOG(WARN, "estimate row count fail", K(ret), K_(key));
|
||||
}
|
||||
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
// construct a single range if split failed
|
||||
ret = OB_SUCCESS;
|
||||
ObStoreRange merge_range;
|
||||
merge_range.set_start_key(*start_key);
|
||||
merge_range.set_end_key(*end_key);
|
||||
if (OB_FAIL(range_array.push_back(merge_range))) {
|
||||
STORAGE_LOG(WARN, "push back merge range to range array failed", KR(ret), K(merge_range));
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !range_array.empty()) {
|
||||
// set range closed or open
|
||||
ObStoreRange &first_range = range_array.at(0);
|
||||
ObStoreRange &last_range = range_array.at(range_array.count() - 1);
|
||||
input_range.get_border_flag().inclusive_start() ? first_range.set_left_closed() : first_range.set_left_open();
|
||||
input_range.get_border_flag().inclusive_end() ? last_range.set_right_closed() : last_range.set_right_open();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -2269,11 +2288,14 @@ int ObMemtable::split_ranges_for_sample(const blocksstable::ObDatumRange &table_
|
||||
while (!split_succ && total_split_range_count > ObMemtableRowSampleIterator::SAMPLE_MEMTABLE_RANGE_COUNT) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
sample_memtable_ranges.reuse();
|
||||
if (OB_TMP_FAIL(try_split_range_for_sample_(table_scan_range.get_start_key().get_store_rowkey(),
|
||||
table_scan_range.get_end_key().get_store_rowkey(),
|
||||
total_split_range_count,
|
||||
allocator,
|
||||
sample_memtable_ranges))) {
|
||||
ObStoreRange input_range;
|
||||
input_range.set_start_key(table_scan_range.get_start_key().get_store_rowkey());
|
||||
input_range.set_end_key(table_scan_range.get_end_key().get_store_rowkey());
|
||||
input_range.is_left_open() ? input_range.set_left_open() : input_range.set_left_closed();
|
||||
input_range.is_right_open() ? input_range.set_right_open() : input_range.set_right_closed();
|
||||
|
||||
if (OB_TMP_FAIL(
|
||||
try_split_range_for_sample_(input_range, total_split_range_count, allocator, sample_memtable_ranges))) {
|
||||
total_split_range_count = total_split_range_count / 10;
|
||||
TRANS_LOG(WARN,
|
||||
"try split range for sampling failed, shrink split range count and retry",
|
||||
@ -2294,19 +2316,18 @@ int ObMemtable::split_ranges_for_sample(const blocksstable::ObDatumRange &table_
|
||||
return ret;
|
||||
}
|
||||
|
||||
int64_t ObMemtable::try_split_range_for_sample_(const ObStoreRowkey &start_key,
|
||||
const ObStoreRowkey &end_key,
|
||||
int64_t ObMemtable::try_split_range_for_sample_(const ObStoreRange &input_range,
|
||||
const int64_t range_count,
|
||||
ObIAllocator &allocator,
|
||||
ObIArray<blocksstable::ObDatumRange> &sample_memtable_ranges)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObStoreRange, 64> store_range_array;
|
||||
if (OB_FAIL(get_split_ranges(&start_key, &end_key, range_count, store_range_array))) {
|
||||
if (OB_FAIL(get_split_ranges(input_range, range_count, store_range_array))) {
|
||||
TRANS_LOG(WARN, "try split ranges for sample failed", KR(ret));
|
||||
} else if (store_range_array.count() != range_count) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "store array count is not equal with range_count", KR(ret), K(range_count), KPC(this));
|
||||
ret = OB_ENTRY_NOT_EXIST;
|
||||
TRANS_LOG(INFO, "memtable row is not enough for splitting", KR(ret), K(range_count), KPC(this));
|
||||
} else {
|
||||
const int64_t range_count_each_chosen =
|
||||
range_count / (ObMemtableRowSampleIterator::SAMPLE_MEMTABLE_RANGE_COUNT - 1);
|
||||
|
@ -394,7 +394,9 @@ public:
|
||||
void set_contain_hotspot_row() { return ATOMIC_STORE(&contain_hotspot_row_, true); }
|
||||
virtual int64_t get_upper_trans_version() const override;
|
||||
virtual int estimate_phy_size(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, int64_t& total_bytes, int64_t& total_rows) override;
|
||||
virtual int get_split_ranges(const ObStoreRowkey* start_key, const ObStoreRowkey* end_key, const int64_t part_cnt, common::ObIArray<common::ObStoreRange> &range_array) override;
|
||||
virtual int get_split_ranges(const ObStoreRange &input_range,
|
||||
const int64_t part_cnt,
|
||||
ObIArray<ObStoreRange> &range_array) override;
|
||||
int split_ranges_for_sample(const blocksstable::ObDatumRange &table_scan_range,
|
||||
const double sample_rate_percentage,
|
||||
ObIAllocator &allocator,
|
||||
@ -573,8 +575,7 @@ private:
|
||||
const int64_t last_compact_cnt,
|
||||
const int64_t total_trans_node_count);
|
||||
bool ready_for_flush_();
|
||||
int64_t try_split_range_for_sample_(const ObStoreRowkey &start_key,
|
||||
const ObStoreRowkey &end_key,
|
||||
int64_t try_split_range_for_sample_(const ObStoreRange &input_range,
|
||||
const int64_t range_count,
|
||||
ObIAllocator &allocator,
|
||||
ObIArray<blocksstable::ObDatumRange> &sample_memtable_ranges);
|
||||
|
@ -142,22 +142,18 @@ public:
|
||||
total_rows = 0;
|
||||
return OB_SUCCESS;
|
||||
}
|
||||
virtual int get_split_ranges(const ObStoreRowkey *start_key,
|
||||
const ObStoreRowkey *end_key,
|
||||
|
||||
virtual int get_split_ranges(const ObStoreRange &input_range,
|
||||
const int64_t part_cnt,
|
||||
common::ObIArray<common::ObStoreRange> &range_array)
|
||||
ObIArray<ObStoreRange> &range_array)
|
||||
{
|
||||
UNUSEDx(start_key, end_key);
|
||||
UNUSEDx(input_range);
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(part_cnt != 1)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
} else {
|
||||
ObStoreRange merge_range;
|
||||
merge_range.set_start_key(ObStoreRowkey::MIN_STORE_ROWKEY);
|
||||
merge_range.set_end_key(ObStoreRowkey::MAX_STORE_ROWKEY);
|
||||
if (OB_FAIL(range_array.push_back(merge_range))) {
|
||||
TRANS_LOG(ERROR, "push back to range array failed", K(ret));
|
||||
}
|
||||
STORAGE_LOG(WARN, "split a single range is not supported", KR(ret), K(input_range), K(part_cnt));
|
||||
} else if (OB_FAIL(range_array.push_back(input_range))) {
|
||||
STORAGE_LOG(WARN, "push back to range array failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -1097,37 +1097,20 @@ int ObPartitionRangeSpliter::split_ranges_memtable(ObRangeSplitInfo &range_info,
|
||||
ObSEArray<ObStoreRange, 16> store_ranges;
|
||||
memtable::ObMemtable *memtable = static_cast<memtable::ObMemtable *>(table);
|
||||
|
||||
if (OB_FAIL(memtable->get_split_ranges(
|
||||
&range_info.store_range_->get_start_key(),
|
||||
&range_info.store_range_->get_end_key(),
|
||||
range_info.parallel_target_count_,
|
||||
store_ranges))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
STORAGE_LOG(WARN, "Failed to get split ranges from memtable, build single range instead", K(ret));
|
||||
if (OB_FAIL(build_single_range(false/*for compaction*/, range_info, allocator, range_array))) {
|
||||
STORAGE_LOG(WARN, "Failed to build single range", K(ret));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "try to make single split range for memtable", K(range_info), K(range_array));
|
||||
}
|
||||
} else {
|
||||
STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret));
|
||||
}
|
||||
if (OB_ISNULL(memtable)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "Unexpected null memtable", K(ret), KP(memtable), K(range_info));
|
||||
} else if (OB_FAIL(memtable->get_split_ranges(
|
||||
*range_info.store_range_, range_info.parallel_target_count_, store_ranges))) {
|
||||
STORAGE_LOG(WARN, "Failed to get split ranges from memtable", K(ret));
|
||||
} else {
|
||||
ObStoreRange store_range;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < store_ranges.count(); i++) {
|
||||
if (OB_FAIL(store_ranges.at(i).deep_copy(allocator, store_range))) {
|
||||
STORAGE_LOG(WARN, "Failed to deep copy store range", K(ret), K(store_ranges));
|
||||
} else if (FALSE_IT(store_range.set_table_id(range_info.store_range_->get_table_id()))) {
|
||||
} else {
|
||||
if (i == 0 && range_info.store_range_->get_border_flag().inclusive_start()) {
|
||||
store_range.set_left_closed();
|
||||
}
|
||||
if (i == store_ranges.count() - 1 && range_info.store_range_->get_border_flag().inclusive_end()) {
|
||||
store_range.set_right_closed();
|
||||
}
|
||||
if (OB_FAIL(range_array.push_back(store_range))) {
|
||||
STORAGE_LOG(WARN, "Failed to push back store range", K(ret), K(store_range));
|
||||
}
|
||||
} else if (OB_FAIL(range_array.push_back(store_range))) {
|
||||
STORAGE_LOG(WARN, "Failed to push back store range", K(ret), K(store_range));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -659,13 +659,11 @@ int ObTxDataMemtable::estimate_phy_size(const ObStoreRowkey *start_key,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTxDataMemtable::get_split_ranges(const ObStoreRowkey *start_key,
|
||||
const ObStoreRowkey *end_key,
|
||||
int ObTxDataMemtable::get_split_ranges(const ObStoreRange &input_range,
|
||||
const int64_t part_cnt,
|
||||
common::ObIArray<common::ObStoreRange> &range_array)
|
||||
{
|
||||
UNUSED(start_key);
|
||||
UNUSED(end_key);
|
||||
UNUSED(input_range);
|
||||
int ret = OB_SUCCESS;
|
||||
|
||||
if (!pre_process_done_) {
|
||||
|
@ -297,8 +297,7 @@ public: /* derived from ObIMemtable */
|
||||
int64_t &total_bytes,
|
||||
int64_t &total_rows) override;
|
||||
|
||||
virtual int get_split_ranges(const ObStoreRowkey *start_key,
|
||||
const ObStoreRowkey *end_key,
|
||||
virtual int get_split_ranges(const ObStoreRange &input_range,
|
||||
const int64_t part_cnt,
|
||||
common::ObIArray<common::ObStoreRange> &range_array) override;
|
||||
// not supported
|
||||
|
Loading…
x
Reference in New Issue
Block a user