From 09be4dc7ee026a6a423877fe23260e7a97295fe2 Mon Sep 17 00:00:00 2001 From: Yongqiang YANG <98214048+dataroaring@users.noreply.github.com> Date: Wed, 20 Mar 2024 18:56:04 +0800 Subject: [PATCH] [fix](random-bucket) tabletindex when there is no cached value in memory (#32336) 1. In cloud mode, get visible version is a rpc to metaservice, while loads would get visible version for all partitions. 2. VunionNode should follow batch size. --- .../pipeline/exec/union_source_operator.cpp | 2 +- be/src/vec/exec/vunion_node.cpp | 2 +- .../planner/TabletLoadIndexRecorderMgr.java | 5 +- ...st_insert_random_distribution_table.groovy | 290 ++++++++---------- .../test_load_to_single_tablet.groovy | 186 +++++++---- 5 files changed, 260 insertions(+), 225 deletions(-) diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index 43b0fb2762..9ddf726700 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -198,7 +198,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo auto& _const_expr_list_idx = local_state._const_expr_list_idx; vectorized::MutableBlock mblock = vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); - for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size(); + for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size(); ++_const_expr_list_idx) { vectorized::Block tmp_block; tmp_block.insert({vectorized::ColumnUInt8::create(1), diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp index 2405373a06..30014cfc5b 100644 --- a/be/src/vec/exec/vunion_node.cpp +++ b/be/src/vec/exec/vunion_node.cpp @@ -161,7 +161,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) { MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor); Block child_block; - while (has_more_materialized() && mblock.rows() <= state->batch_size()) { + while (has_more_materialized() && mblock.rows() < state->batch_size()) { // The loop runs until we are either done iterating over the children that require // materialization, or the row batch is at capacity. DCHECK(!is_child_passthrough(_child_idx)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java index 3b14445653..685a57c55b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/TabletLoadIndexRecorderMgr.java @@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Triple; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.security.SecureRandom; import java.util.concurrent.ConcurrentHashMap; public class TabletLoadIndexRecorderMgr extends MasterDaemon { @@ -33,6 +34,7 @@ public class TabletLoadIndexRecorderMgr extends MasterDaemon { private static final long TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days private static final long TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS = 3600000; // 1 hour private static final int TIMES_FOR_UPDATE_TIMESTAMP = 1000; + private static final SecureRandom RANDOM = new SecureRandom(); // < -> load_tablet_record> // 0 =< load_tablet_index < number_buckets @@ -58,7 +60,8 @@ public class TabletLoadIndexRecorderMgr extends MasterDaemon { public int getCurrentTabletLoadIndex(long dbId, long tableId, Partition partition) throws UserException { Triple key = Triple.of(dbId, tableId, partition.getId()); return loadTabletRecordMap.compute(key, (k, existingRecord) -> - existingRecord == null ? new TabletLoadIndexRecord(partition.getVisibleVersion() - 1, + existingRecord == null ? new TabletLoadIndexRecord(RANDOM.nextInt( + partition.getDistributionInfo().getBucketNum()), partition.getDistributionInfo().getBucketNum()) : existingRecord).getAndIncrement(); } diff --git a/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy index ae9f81400a..f9e456b5d3 100644 --- a/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy +++ b/regression-test/suites/load_p0/insert/test_insert_random_distribution_table.groovy @@ -58,23 +58,24 @@ suite("test_insert_random_distribution_table", "p0") { def tablets = getTablets.call(res) - def tabletId1 = tablets[0] - def tabletId2 = tablets[1] - def tabletId3 = tablets[2] - def tabletId4 = tablets[3] - def tabletId5 = tablets[4] + // define an array to store count of each tablet + def rowCounts = [] - def rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" - def rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" - def rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" - def rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" - def rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" + def beginIdx = -1 + for (int i = tablets.size() - 1; i >= 0; i--) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}") + if (rowCounts[i] > 0 && (beginIdx == (i + 1) || beginIdx == -1)) { + beginIdx = i + } + } - assertEquals(rowCount1[0][0], 3) - assertEquals(rowCount2[0][0], 1) - assertEquals(rowCount3[0][0], 0) - assertEquals(rowCount4[0][0], 0) - assertEquals(rowCount5[0][0], 0) + assertEquals(rowCounts[beginIdx], 2) + assertEquals(rowCounts[(beginIdx + 1) % 5], 2) + assertEquals(rowCounts[(beginIdx + 2) % 5], 0) + assertEquals(rowCounts[(beginIdx + 3) % 5], 0) + assertEquals(rowCounts[(beginIdx + 4) % 5], 0) sql "set batch_size=2" // insert second time @@ -83,17 +84,16 @@ suite("test_insert_random_distribution_table", "p0") { totalCount = sql "select count() from ${tableName}" assertEquals(totalCount[0][0], 8) - rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" - rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" - rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" - - assertEquals(rowCount1[0][0], 3) - assertEquals(rowCount2[0][0], 4) - assertEquals(rowCount3[0][0], 1) - assertEquals(rowCount4[0][0], 0) - assertEquals(rowCount5[0][0], 0) + for (int i = 0; i < tablets.size(); i++) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}") + } + assertEquals(rowCounts[(beginIdx + 0) % 5], 2) + assertEquals(rowCounts[(beginIdx + 1) % 5], 4) + assertEquals(rowCounts[(beginIdx + 2) % 5], 2) + assertEquals(rowCounts[(beginIdx + 3) % 5], 0) + assertEquals(rowCounts[(beginIdx + 4) % 5], 0) sql "set batch_size=2" // insert third time @@ -102,17 +102,17 @@ suite("test_insert_random_distribution_table", "p0") { totalCount = sql "select count() from ${tableName}" assertEquals(totalCount[0][0], 12) - rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})" - rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})" - rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})" + for (int i = 0; i < tablets.size(); i++) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}") + } - assertEquals(rowCount1[0][0], 3) - assertEquals(rowCount2[0][0], 4) - assertEquals(rowCount3[0][0], 4) - assertEquals(rowCount4[0][0], 1) - assertEquals(rowCount5[0][0], 0) + assertEquals(rowCounts[(beginIdx + 0) % 5], 2) + assertEquals(rowCounts[(beginIdx + 1) % 5], 4) + assertEquals(rowCounts[(beginIdx + 2) % 5], 4) + assertEquals(rowCounts[(beginIdx + 3) % 5], 2) + assertEquals(rowCounts[(beginIdx + 4) % 5], 0) // ${tableName} partitioned table sql """ DROP TABLE IF EXISTS ${tableName} """ @@ -141,69 +141,45 @@ suite("test_insert_random_distribution_table", "p0") { sql "sync" totalCount = sql "select count() from ${tableName}" - def partition1 = "p20231011" - def partition2 = "p20231012" - def partition3 = "p20231013" assertEquals(totalCount[0][0], 5) - res = sql "show tablets from ${tableName} partition ${partition1}" - tablets = getTablets.call(res) - def tabletId11 = tablets[0] - def tabletId12 = tablets[1] - def tabletId13 = tablets[2] - def tabletId14 = tablets[3] - def tabletId15 = tablets[4] - res = sql "show tablets from ${tableName} partition ${partition2}" - tablets = getTablets.call(res) - def tabletId21 = tablets[0] - def tabletId22 = tablets[1] - def tabletId23 = tablets[2] - def tabletId24 = tablets[3] - def tabletId25 = tablets[4] + def partitions = ["p20231011", "p20231012", "p20231013"] + def partitionTablets = [] + def partitionRowCounts = [] + def partitionBeginIdx = []; + for (int p = 0; p < 3; p++) { + res = sql "show tablets from ${tableName} partition ${partitions[p]}" + partitionTablets[p] = getTablets.call(res) + partitionRowCounts[p] = [] + numTablets = partitionTablets[p].size() + for (int i = numTablets - 1; i >= 0; i--) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})" + partitionRowCounts[p][i] = countResult[0][0] + log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " + + "${partitionRowCounts[p][i]}") + if (partitionRowCounts[p][i] > 0 && + (partitionBeginIdx[p] == (i + 1) || partitionBeginIdx[p] == null)) { + partitionBeginIdx[p] = i + } + } + } - res = sql "show tablets from ${tableName} partition ${partition3}" - tablets = getTablets.call(res) - def tabletId31 = tablets[0] - def tabletId32 = tablets[1] - def tabletId33 = tablets[2] - def tabletId34 = tablets[3] - def tabletId35 = tablets[4] + assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 1) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 1) + for (int i = 3; i < 10; i++) { + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0) + } + + assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 1) + for (int i = 2; i < 10; i++) { + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0) + } - def rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" - def rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" - def rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" - def rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" - def rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" - - def rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" - def rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" - def rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" - def rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" - def rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" - - def rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" - def rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" - def rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" - def rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" - def rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" - - assertEquals(rowCount11[0][0], 2) - assertEquals(rowCount12[0][0], 1) - assertEquals(rowCount13[0][0], 0) - assertEquals(rowCount14[0][0], 0) - assertEquals(rowCount15[0][0], 0) - - assertEquals(rowCount21[0][0], 1) - assertEquals(rowCount22[0][0], 1) - assertEquals(rowCount23[0][0], 0) - assertEquals(rowCount24[0][0], 0) - assertEquals(rowCount25[0][0], 0) - - assertEquals(rowCount31[0][0], 0) - assertEquals(rowCount32[0][0], 0) - assertEquals(rowCount33[0][0], 0) - assertEquals(rowCount34[0][0], 0) - assertEquals(rowCount35[0][0], 0) + for (int i = 0; i < 10; i++) { + assertEquals(partitionRowCounts[2][i], 0) + } sql "set batch_size=1" // insert second time @@ -213,41 +189,42 @@ suite("test_insert_random_distribution_table", "p0") { totalCount = sql "select count() from ${tableName}" assertEquals(totalCount[0][0], 10) - rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" - rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" - rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" - rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" - rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" - rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" - rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" - rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" - rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" - rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" + for (int p = 0; p < 3; p++) { + numTablets = partitionTablets[p].size() + for (int i = numTablets - 1; i >= 0; i--) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})" + partitionRowCounts[p][i] = countResult[0][0] + if (p == 2) { + if ((partitionRowCounts[p][i]) > 0 && + (partitionBeginIdx[p] == (i + 1) || partitionBeginIdx[p] == null)) { + partitionBeginIdx[p] = i + } + } + log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " + + "${partitionRowCounts[p][i]}") + } + } - rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" - rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" - rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" - rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" - rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" + assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 2) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 1) + for (int i = 3; i < 10; i++) { + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0) + } - assertEquals(rowCount11[0][0], 2) - assertEquals(rowCount12[0][0], 2) - assertEquals(rowCount13[0][0], 0) - assertEquals(rowCount14[0][0], 0) - assertEquals(rowCount15[0][0], 0) + assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 2) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 2) % 10], 1) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 3) % 10], 1) + for (int i = 4; i < 10; i++) { + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0) + } - assertEquals(rowCount21[0][0], 1) - assertEquals(rowCount22[0][0], 3) - assertEquals(rowCount23[0][0], 1) - assertEquals(rowCount24[0][0], 0) - assertEquals(rowCount25[0][0], 0) - - assertEquals(rowCount31[0][0], 0) - assertEquals(rowCount32[0][0], 1) - assertEquals(rowCount33[0][0], 0) - assertEquals(rowCount34[0][0], 0) - assertEquals(rowCount35[0][0], 0) + assertEquals(partitionRowCounts[2][partitionBeginIdx[2]], 1) + for (int i = 1; i < 10; i++) { + assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + i) % 10], 0) + } sql "set batch_size=1" // insert third time @@ -258,39 +235,36 @@ suite("test_insert_random_distribution_table", "p0") { totalCount = sql "select count() from ${tableName}" assertEquals(totalCount[0][0], 16) - rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})" - rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})" - rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})" - rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})" - rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})" + for (int p = 0; p < 3; p++) { + for (int i = 0; i < partitionTablets[p].size(); i++) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})" + partitionRowCounts[p][i] = countResult[0][0] + log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " + + "${partitionRowCounts[p][i]}") + } + } - rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})" - rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})" - rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})" - rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})" - rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})" + assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 2) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 2) + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 3) % 10], 1) + for (int i = 4; i < 10; i++) { + assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0) + } - rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})" - rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})" - rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})" - rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})" - rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})" + assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 2) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 2) % 10], 2) + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 3) % 10], 1) + for (int i = 4; i < 10; i++) { + assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0) + } - assertEquals(rowCount11[0][0], 2) - assertEquals(rowCount12[0][0], 2) - assertEquals(rowCount13[0][0], 2) - assertEquals(rowCount14[0][0], 0) - assertEquals(rowCount15[0][0], 0) - - assertEquals(rowCount21[0][0], 1) - assertEquals(rowCount22[0][0], 3) - assertEquals(rowCount23[0][0], 2) - assertEquals(rowCount24[0][0], 0) - assertEquals(rowCount25[0][0], 0) - - assertEquals(rowCount31[0][0], 0) - assertEquals(rowCount32[0][0], 1) - assertEquals(rowCount33[0][0], 2) - assertEquals(rowCount34[0][0], 1) - assertEquals(rowCount35[0][0], 0) + assertEquals(partitionRowCounts[2][partitionBeginIdx[2]], 1) + assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 1) % 10], 1) + assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 2) % 10], 1) + assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 3) % 10], 1) + for (int i = 4; i < 10; i++) { + assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + i) % 10], 0) + } } diff --git a/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy index 80b25bd34c..02f6490032 100644 --- a/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy +++ b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy @@ -57,17 +57,28 @@ suite("test_load_to_single_tablet", "p0") { assertEquals(10, totalCount[0][0]) String[][] res = sql "show tablets from ${tableName}" res = deduplicate_tablets(res) - def tablet1 = res[0][0] - def tablet2 = res[1][0] - def tablet3 = res[2][0] - def rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - def rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - def rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" - assertEquals(10, rowCount1[0][0]) - assertEquals(0, rowCount2[0][0]) - assertEquals(0, rowCount3[0][0]) - + def tablets = [] + for (int i = 0; i < res.size(); i++) { + tablets.add(res[i][0]) + } + + def beginIdx = -1 + def rowCounts = [] + + for (int i = tablets.size() - 1; i >= 0; i--) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}") + if (rowCounts[i] > 0 && (beginIdx == -1 || beginIdx == i + 1)) { + beginIdx = i; + } + } + + assertEquals(10, rowCounts[beginIdx]) + for (int i = 1; i < tablets.size(); i++) { + assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()]) + } // load second time streamLoad { @@ -82,13 +93,19 @@ suite("test_load_to_single_tablet", "p0") { } sql "sync" totalCount = sql "select count() from ${tableName}" - rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" assertEquals(20, totalCount[0][0]) - assertEquals(10, rowCount1[0][0]) - assertEquals(10, rowCount2[0][0]) - assertEquals(0, rowCount3[0][0]) + + for (int i = 0; i < tablets.size(); i++) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}") + } + + assertEquals(10, rowCounts[beginIdx]) + assertEquals(10, rowCounts[(beginIdx + 1) % tablets.size()]) + for (int i = 2; i < tablets.size(); i++) { + assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()]) + } // load third time streamLoad { @@ -103,13 +120,21 @@ suite("test_load_to_single_tablet", "p0") { } sql "sync" totalCount = sql "select count() from ${tableName}" - rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" assertEquals(30, totalCount[0][0]) - assertEquals(10, rowCount1[0][0]) - assertEquals(10, rowCount2[0][0]) - assertEquals(10, rowCount3[0][0]) + + for (int i = 0; i < tablets.size(); i++) { + def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})" + rowCounts[i] = countResult[0][0] + log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}") + } + + assertEquals(10, rowCounts[beginIdx]) + assertEquals(10, rowCounts[(beginIdx + 1) % tablets.size()]) + assertEquals(10, rowCounts[(beginIdx + 2) % tablets.size()]) + + for (int i = 3; i < tablets.size(); i++) { + assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()]) + } // test partitioned table tableName = "test_load_to_single_tablet_partitioned" @@ -148,29 +173,43 @@ suite("test_load_to_single_tablet", "p0") { } sql "sync" + totalCount = sql "select count() from ${tableName}" assertEquals(10, totalCount[0][0]) + + def partitionTablets = [] + def partitionRowCounts = [] + def partitionBeginIdx = [] res = sql "show tablets from ${tableName} partitions(p20231011, p20231012)" res = deduplicate_tablets(res) - tablet1 = res[0][0] - tablet2 = res[1][0] - tablet3 = res[2][0] - tablet4 = res[10][0] - tablet5 = res[11][0] - tablet6 = res[12][0] + for (int i = 0; i < res.size(); i++) { + if (i % 10 == 0) { + partitionTablets[i/10] = [] + partitionRowCounts[i/10] = [] + } + partitionTablets[i/10][i%10] = res[i][0] + } - rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" - def rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})" - def rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})" - def rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})" - assertEquals(5, rowCount1[0][0]) - assertEquals(0, rowCount2[0][0]) - assertEquals(0, rowCount3[0][0]) - assertEquals(5, rowCount4[0][0]) - assertEquals(0, rowCount5[0][0]) - assertEquals(0, rowCount6[0][0]) + for (int i = 0; i < partitionTablets.size(); i++) { + for (int j = partitionTablets[i].size() - 1; j >= 0; j--) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})" + partitionRowCounts[i][j] = countResult[0][0] + log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}") + if (partitionRowCounts[i][j] > 0 && + (partitionBeginIdx[i] == null || partitionBeginIdx[i] == j + 1)) { + partitionBeginIdx[i] = j + } + } + } + + assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]]) + for (int i = 1; i < partitionTablets[0].size(); i++) { + assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()]) + } + assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]]) + for (int i = 1; i < partitionTablets[1].size(); i++) { + assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()]) + } // load second time streamLoad { @@ -185,19 +224,27 @@ suite("test_load_to_single_tablet", "p0") { } sql "sync" totalCount = sql "select count() from ${tableName}" - rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" - rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})" - rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})" - rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})" assertEquals(20, totalCount[0][0]) - assertEquals(5, rowCount1[0][0]) - assertEquals(5, rowCount2[0][0]) - assertEquals(0, rowCount3[0][0]) - assertEquals(5, rowCount4[0][0]) - assertEquals(5, rowCount5[0][0]) - assertEquals(0, rowCount6[0][0]) + + for (int i = 0; i < partitionTablets.size(); i++) { + for (int j = 0; j < partitionTablets[i].size(); j++) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})" + partitionRowCounts[i][j] = countResult[0][0] + log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}") + } + } + + assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]]) + assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 1) % partitionTablets[0].size()]) + for (int i = 2; i < partitionTablets[0].size(); i++) { + assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()]) + } + + assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]]) + assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 1) % partitionTablets[1].size()]) + for (int i = 2; i < partitionTablets[1].size(); i++) { + assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()]) + } // load third time streamLoad { @@ -212,18 +259,29 @@ suite("test_load_to_single_tablet", "p0") { } sql "sync" totalCount = sql "select count() from ${tableName}" - rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})" - rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})" - rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})" - rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})" - rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})" - rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})" assertEquals(30, totalCount[0][0]) - assertEquals(5, rowCount1[0][0]) - assertEquals(5, rowCount2[0][0]) - assertEquals(5, rowCount3[0][0]) - assertEquals(5, rowCount4[0][0]) - assertEquals(5, rowCount5[0][0]) - assertEquals(5, rowCount6[0][0]) + + for (int i = 0; i < partitionTablets.size(); i++) { + for (int j = 0; j < partitionTablets[i].size(); j++) { + def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})" + partitionRowCounts[i][j] = countResult[0][0] + log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}") + } + } + + assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]]) + assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 1) % partitionTablets[0].size()]) + assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 2) % partitionTablets[0].size()]) + + for (int i = 3; i < partitionTablets[0].size(); i++) { + assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()]) + } + + assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]]) + assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 1) % partitionTablets[1].size()]) + assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 2) % partitionTablets[1].size()]) + for (int i = 3; i < partitionTablets[1].size(); i++) { + assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()]) + } }