[fix](random-bucket) tabletindex when there is no cached value in memory (#32336)

1. In cloud mode, get visible version is a rpc to metaservice, while
loads would get visible version for all partitions.
2. VunionNode should follow batch size.
This commit is contained in:
Yongqiang YANG
2024-03-20 18:56:04 +08:00
committed by yiguolei
parent 06bf5541f2
commit 09be4dc7ee
5 changed files with 260 additions and 225 deletions

View File

@ -198,7 +198,7 @@ Status UnionSourceOperatorX::get_next_const(RuntimeState* state, vectorized::Blo
auto& _const_expr_list_idx = local_state._const_expr_list_idx;
vectorized::MutableBlock mblock =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size();
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() < state->batch_size();
++_const_expr_list_idx) {
vectorized::Block tmp_block;
tmp_block.insert({vectorized::ColumnUInt8::create(1),

View File

@ -161,7 +161,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
Block child_block;
while (has_more_materialized() && mblock.rows() <= state->batch_size()) {
while (has_more_materialized() && mblock.rows() < state->batch_size()) {
// The loop runs until we are either done iterating over the children that require
// materialization, or the row batch is at capacity.
DCHECK(!is_child_passthrough(_child_idx));

View File

@ -26,6 +26,7 @@ import org.apache.commons.lang3.tuple.Triple;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
public class TabletLoadIndexRecorderMgr extends MasterDaemon {
@ -33,6 +34,7 @@ public class TabletLoadIndexRecorderMgr extends MasterDaemon {
private static final long TABLET_LOAD_INDEX_KEEP_MAX_TIME_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days
private static final long TABLET_LOAD_INDEX_EXPIRE_CHECK_INTERVAL_MS = 3600000; // 1 hour
private static final int TIMES_FOR_UPDATE_TIMESTAMP = 1000;
private static final SecureRandom RANDOM = new SecureRandom();
// <<db_id, table_id, partition_id> -> load_tablet_record>
// 0 =< load_tablet_index < number_buckets
@ -58,7 +60,8 @@ public class TabletLoadIndexRecorderMgr extends MasterDaemon {
public int getCurrentTabletLoadIndex(long dbId, long tableId, Partition partition) throws UserException {
Triple<Long, Long, Long> key = Triple.of(dbId, tableId, partition.getId());
return loadTabletRecordMap.compute(key, (k, existingRecord) ->
existingRecord == null ? new TabletLoadIndexRecord(partition.getVisibleVersion() - 1,
existingRecord == null ? new TabletLoadIndexRecord(RANDOM.nextInt(
partition.getDistributionInfo().getBucketNum()),
partition.getDistributionInfo().getBucketNum()) : existingRecord).getAndIncrement();
}

View File

@ -58,23 +58,24 @@ suite("test_insert_random_distribution_table", "p0") {
def tablets = getTablets.call(res)
def tabletId1 = tablets[0]
def tabletId2 = tablets[1]
def tabletId3 = tablets[2]
def tabletId4 = tablets[3]
def tabletId5 = tablets[4]
// define an array to store count of each tablet
def rowCounts = []
def rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
def rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
def rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
def rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
def rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
def beginIdx = -1
for (int i = tablets.size() - 1; i >= 0; i--) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}")
if (rowCounts[i] > 0 && (beginIdx == (i + 1) || beginIdx == -1)) {
beginIdx = i
}
}
assertEquals(rowCount1[0][0], 3)
assertEquals(rowCount2[0][0], 1)
assertEquals(rowCount3[0][0], 0)
assertEquals(rowCount4[0][0], 0)
assertEquals(rowCount5[0][0], 0)
assertEquals(rowCounts[beginIdx], 2)
assertEquals(rowCounts[(beginIdx + 1) % 5], 2)
assertEquals(rowCounts[(beginIdx + 2) % 5], 0)
assertEquals(rowCounts[(beginIdx + 3) % 5], 0)
assertEquals(rowCounts[(beginIdx + 4) % 5], 0)
sql "set batch_size=2"
// insert second time
@ -83,17 +84,16 @@ suite("test_insert_random_distribution_table", "p0") {
totalCount = sql "select count() from ${tableName}"
assertEquals(totalCount[0][0], 8)
rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
assertEquals(rowCount1[0][0], 3)
assertEquals(rowCount2[0][0], 4)
assertEquals(rowCount3[0][0], 1)
assertEquals(rowCount4[0][0], 0)
assertEquals(rowCount5[0][0], 0)
for (int i = 0; i < tablets.size(); i++) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}")
}
assertEquals(rowCounts[(beginIdx + 0) % 5], 2)
assertEquals(rowCounts[(beginIdx + 1) % 5], 4)
assertEquals(rowCounts[(beginIdx + 2) % 5], 2)
assertEquals(rowCounts[(beginIdx + 3) % 5], 0)
assertEquals(rowCounts[(beginIdx + 4) % 5], 0)
sql "set batch_size=2"
// insert third time
@ -102,17 +102,17 @@ suite("test_insert_random_distribution_table", "p0") {
totalCount = sql "select count() from ${tableName}"
assertEquals(totalCount[0][0], 12)
rowCount1 = sql "select count() from ${tableName} tablet(${tabletId1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tabletId2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tabletId3})"
rowCount4 = sql "select count() from ${tableName} tablet(${tabletId4})"
rowCount5 = sql "select count() from ${tableName} tablet(${tabletId5})"
for (int i = 0; i < tablets.size(); i++) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet = ${tablets[i]}, rowCounts[${i}] = ${rowCounts[i]}")
}
assertEquals(rowCount1[0][0], 3)
assertEquals(rowCount2[0][0], 4)
assertEquals(rowCount3[0][0], 4)
assertEquals(rowCount4[0][0], 1)
assertEquals(rowCount5[0][0], 0)
assertEquals(rowCounts[(beginIdx + 0) % 5], 2)
assertEquals(rowCounts[(beginIdx + 1) % 5], 4)
assertEquals(rowCounts[(beginIdx + 2) % 5], 4)
assertEquals(rowCounts[(beginIdx + 3) % 5], 2)
assertEquals(rowCounts[(beginIdx + 4) % 5], 0)
// ${tableName} partitioned table
sql """ DROP TABLE IF EXISTS ${tableName} """
@ -141,69 +141,45 @@ suite("test_insert_random_distribution_table", "p0") {
sql "sync"
totalCount = sql "select count() from ${tableName}"
def partition1 = "p20231011"
def partition2 = "p20231012"
def partition3 = "p20231013"
assertEquals(totalCount[0][0], 5)
res = sql "show tablets from ${tableName} partition ${partition1}"
tablets = getTablets.call(res)
def tabletId11 = tablets[0]
def tabletId12 = tablets[1]
def tabletId13 = tablets[2]
def tabletId14 = tablets[3]
def tabletId15 = tablets[4]
res = sql "show tablets from ${tableName} partition ${partition2}"
tablets = getTablets.call(res)
def tabletId21 = tablets[0]
def tabletId22 = tablets[1]
def tabletId23 = tablets[2]
def tabletId24 = tablets[3]
def tabletId25 = tablets[4]
def partitions = ["p20231011", "p20231012", "p20231013"]
def partitionTablets = []
def partitionRowCounts = []
def partitionBeginIdx = [];
for (int p = 0; p < 3; p++) {
res = sql "show tablets from ${tableName} partition ${partitions[p]}"
partitionTablets[p] = getTablets.call(res)
partitionRowCounts[p] = []
numTablets = partitionTablets[p].size()
for (int i = numTablets - 1; i >= 0; i--) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})"
partitionRowCounts[p][i] = countResult[0][0]
log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " +
"${partitionRowCounts[p][i]}")
if (partitionRowCounts[p][i] > 0 &&
(partitionBeginIdx[p] == (i + 1) || partitionBeginIdx[p] == null)) {
partitionBeginIdx[p] = i
}
}
}
res = sql "show tablets from ${tableName} partition ${partition3}"
tablets = getTablets.call(res)
def tabletId31 = tablets[0]
def tabletId32 = tablets[1]
def tabletId33 = tablets[2]
def tabletId34 = tablets[3]
def tabletId35 = tablets[4]
assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 1)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 1)
for (int i = 3; i < 10; i++) {
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0)
}
assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 1)
for (int i = 2; i < 10; i++) {
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0)
}
def rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})"
def rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})"
def rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})"
def rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})"
def rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})"
def rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})"
def rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})"
def rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})"
def rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})"
def rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})"
def rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})"
def rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})"
def rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})"
def rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})"
def rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})"
assertEquals(rowCount11[0][0], 2)
assertEquals(rowCount12[0][0], 1)
assertEquals(rowCount13[0][0], 0)
assertEquals(rowCount14[0][0], 0)
assertEquals(rowCount15[0][0], 0)
assertEquals(rowCount21[0][0], 1)
assertEquals(rowCount22[0][0], 1)
assertEquals(rowCount23[0][0], 0)
assertEquals(rowCount24[0][0], 0)
assertEquals(rowCount25[0][0], 0)
assertEquals(rowCount31[0][0], 0)
assertEquals(rowCount32[0][0], 0)
assertEquals(rowCount33[0][0], 0)
assertEquals(rowCount34[0][0], 0)
assertEquals(rowCount35[0][0], 0)
for (int i = 0; i < 10; i++) {
assertEquals(partitionRowCounts[2][i], 0)
}
sql "set batch_size=1"
// insert second time
@ -213,41 +189,42 @@ suite("test_insert_random_distribution_table", "p0") {
totalCount = sql "select count() from ${tableName}"
assertEquals(totalCount[0][0], 10)
rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})"
rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})"
rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})"
rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})"
rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})"
rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})"
rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})"
rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})"
rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})"
rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})"
for (int p = 0; p < 3; p++) {
numTablets = partitionTablets[p].size()
for (int i = numTablets - 1; i >= 0; i--) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})"
partitionRowCounts[p][i] = countResult[0][0]
if (p == 2) {
if ((partitionRowCounts[p][i]) > 0 &&
(partitionBeginIdx[p] == (i + 1) || partitionBeginIdx[p] == null)) {
partitionBeginIdx[p] = i
}
}
log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " +
"${partitionRowCounts[p][i]}")
}
}
rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})"
rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})"
rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})"
rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})"
rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})"
assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 2)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 1)
for (int i = 3; i < 10; i++) {
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0)
}
assertEquals(rowCount11[0][0], 2)
assertEquals(rowCount12[0][0], 2)
assertEquals(rowCount13[0][0], 0)
assertEquals(rowCount14[0][0], 0)
assertEquals(rowCount15[0][0], 0)
assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 2)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 2) % 10], 1)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 3) % 10], 1)
for (int i = 4; i < 10; i++) {
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0)
}
assertEquals(rowCount21[0][0], 1)
assertEquals(rowCount22[0][0], 3)
assertEquals(rowCount23[0][0], 1)
assertEquals(rowCount24[0][0], 0)
assertEquals(rowCount25[0][0], 0)
assertEquals(rowCount31[0][0], 0)
assertEquals(rowCount32[0][0], 1)
assertEquals(rowCount33[0][0], 0)
assertEquals(rowCount34[0][0], 0)
assertEquals(rowCount35[0][0], 0)
assertEquals(partitionRowCounts[2][partitionBeginIdx[2]], 1)
for (int i = 1; i < 10; i++) {
assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + i) % 10], 0)
}
sql "set batch_size=1"
// insert third time
@ -258,39 +235,36 @@ suite("test_insert_random_distribution_table", "p0") {
totalCount = sql "select count() from ${tableName}"
assertEquals(totalCount[0][0], 16)
rowCount11 = sql "select count() from ${tableName} tablet(${tabletId11})"
rowCount12 = sql "select count() from ${tableName} tablet(${tabletId12})"
rowCount13 = sql "select count() from ${tableName} tablet(${tabletId13})"
rowCount14 = sql "select count() from ${tableName} tablet(${tabletId14})"
rowCount15 = sql "select count() from ${tableName} tablet(${tabletId15})"
for (int p = 0; p < 3; p++) {
for (int i = 0; i < partitionTablets[p].size(); i++) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[p][i]})"
partitionRowCounts[p][i] = countResult[0][0]
log.info("tablet = ${partitionTablets[p][i]}, partitionRowCounts[${p}][${i}] = " +
"${partitionRowCounts[p][i]}")
}
}
rowCount21 = sql "select count() from ${tableName} tablet(${tabletId21})"
rowCount22 = sql "select count() from ${tableName} tablet(${tabletId22})"
rowCount23 = sql "select count() from ${tableName} tablet(${tabletId23})"
rowCount24 = sql "select count() from ${tableName} tablet(${tabletId24})"
rowCount25 = sql "select count() from ${tableName} tablet(${tabletId25})"
assertEquals(partitionRowCounts[0][partitionBeginIdx[0]], 1)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 1) % 10], 2)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 2) % 10], 2)
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + 3) % 10], 1)
for (int i = 4; i < 10; i++) {
assertEquals(partitionRowCounts[0][(partitionBeginIdx[0] + i) % 10], 0)
}
rowCount31 = sql "select count() from ${tableName} tablet(${tabletId31})"
rowCount32 = sql "select count() from ${tableName} tablet(${tabletId32})"
rowCount33 = sql "select count() from ${tableName} tablet(${tabletId33})"
rowCount34 = sql "select count() from ${tableName} tablet(${tabletId34})"
rowCount35 = sql "select count() from ${tableName} tablet(${tabletId35})"
assertEquals(partitionRowCounts[1][partitionBeginIdx[1]], 1)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 1) % 10], 2)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 2) % 10], 2)
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + 3) % 10], 1)
for (int i = 4; i < 10; i++) {
assertEquals(partitionRowCounts[1][(partitionBeginIdx[1] + i) % 10], 0)
}
assertEquals(rowCount11[0][0], 2)
assertEquals(rowCount12[0][0], 2)
assertEquals(rowCount13[0][0], 2)
assertEquals(rowCount14[0][0], 0)
assertEquals(rowCount15[0][0], 0)
assertEquals(rowCount21[0][0], 1)
assertEquals(rowCount22[0][0], 3)
assertEquals(rowCount23[0][0], 2)
assertEquals(rowCount24[0][0], 0)
assertEquals(rowCount25[0][0], 0)
assertEquals(rowCount31[0][0], 0)
assertEquals(rowCount32[0][0], 1)
assertEquals(rowCount33[0][0], 2)
assertEquals(rowCount34[0][0], 1)
assertEquals(rowCount35[0][0], 0)
assertEquals(partitionRowCounts[2][partitionBeginIdx[2]], 1)
assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 1) % 10], 1)
assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 2) % 10], 1)
assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + 3) % 10], 1)
for (int i = 4; i < 10; i++) {
assertEquals(partitionRowCounts[2][(partitionBeginIdx[2] + i) % 10], 0)
}
}

View File

@ -57,17 +57,28 @@ suite("test_load_to_single_tablet", "p0") {
assertEquals(10, totalCount[0][0])
String[][] res = sql "show tablets from ${tableName}"
res = deduplicate_tablets(res)
def tablet1 = res[0][0]
def tablet2 = res[1][0]
def tablet3 = res[2][0]
def rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
def rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
def rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
assertEquals(10, rowCount1[0][0])
assertEquals(0, rowCount2[0][0])
assertEquals(0, rowCount3[0][0])
def tablets = []
for (int i = 0; i < res.size(); i++) {
tablets.add(res[i][0])
}
def beginIdx = -1
def rowCounts = []
for (int i = tablets.size() - 1; i >= 0; i--) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}")
if (rowCounts[i] > 0 && (beginIdx == -1 || beginIdx == i + 1)) {
beginIdx = i;
}
}
assertEquals(10, rowCounts[beginIdx])
for (int i = 1; i < tablets.size(); i++) {
assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()])
}
// load second time
streamLoad {
@ -82,13 +93,19 @@ suite("test_load_to_single_tablet", "p0") {
}
sql "sync"
totalCount = sql "select count() from ${tableName}"
rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
assertEquals(20, totalCount[0][0])
assertEquals(10, rowCount1[0][0])
assertEquals(10, rowCount2[0][0])
assertEquals(0, rowCount3[0][0])
for (int i = 0; i < tablets.size(); i++) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}")
}
assertEquals(10, rowCounts[beginIdx])
assertEquals(10, rowCounts[(beginIdx + 1) % tablets.size()])
for (int i = 2; i < tablets.size(); i++) {
assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()])
}
// load third time
streamLoad {
@ -103,13 +120,21 @@ suite("test_load_to_single_tablet", "p0") {
}
sql "sync"
totalCount = sql "select count() from ${tableName}"
rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
assertEquals(30, totalCount[0][0])
assertEquals(10, rowCount1[0][0])
assertEquals(10, rowCount2[0][0])
assertEquals(10, rowCount3[0][0])
for (int i = 0; i < tablets.size(); i++) {
def countResult = sql "select count() from ${tableName} tablet(${tablets[i]})"
rowCounts[i] = countResult[0][0]
log.info("tablet: ${tablets[i]}, rowCount: ${rowCounts[i]}")
}
assertEquals(10, rowCounts[beginIdx])
assertEquals(10, rowCounts[(beginIdx + 1) % tablets.size()])
assertEquals(10, rowCounts[(beginIdx + 2) % tablets.size()])
for (int i = 3; i < tablets.size(); i++) {
assertEquals(0, rowCounts[(beginIdx + i) % tablets.size()])
}
// test partitioned table
tableName = "test_load_to_single_tablet_partitioned"
@ -148,29 +173,43 @@ suite("test_load_to_single_tablet", "p0") {
}
sql "sync"
totalCount = sql "select count() from ${tableName}"
assertEquals(10, totalCount[0][0])
def partitionTablets = []
def partitionRowCounts = []
def partitionBeginIdx = []
res = sql "show tablets from ${tableName} partitions(p20231011, p20231012)"
res = deduplicate_tablets(res)
tablet1 = res[0][0]
tablet2 = res[1][0]
tablet3 = res[2][0]
tablet4 = res[10][0]
tablet5 = res[11][0]
tablet6 = res[12][0]
for (int i = 0; i < res.size(); i++) {
if (i % 10 == 0) {
partitionTablets[i/10] = []
partitionRowCounts[i/10] = []
}
partitionTablets[i/10][i%10] = res[i][0]
}
rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
def rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})"
def rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})"
def rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})"
assertEquals(5, rowCount1[0][0])
assertEquals(0, rowCount2[0][0])
assertEquals(0, rowCount3[0][0])
assertEquals(5, rowCount4[0][0])
assertEquals(0, rowCount5[0][0])
assertEquals(0, rowCount6[0][0])
for (int i = 0; i < partitionTablets.size(); i++) {
for (int j = partitionTablets[i].size() - 1; j >= 0; j--) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})"
partitionRowCounts[i][j] = countResult[0][0]
log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}")
if (partitionRowCounts[i][j] > 0 &&
(partitionBeginIdx[i] == null || partitionBeginIdx[i] == j + 1)) {
partitionBeginIdx[i] = j
}
}
}
assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]])
for (int i = 1; i < partitionTablets[0].size(); i++) {
assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()])
}
assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]])
for (int i = 1; i < partitionTablets[1].size(); i++) {
assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()])
}
// load second time
streamLoad {
@ -185,19 +224,27 @@ suite("test_load_to_single_tablet", "p0") {
}
sql "sync"
totalCount = sql "select count() from ${tableName}"
rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})"
rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})"
rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})"
assertEquals(20, totalCount[0][0])
assertEquals(5, rowCount1[0][0])
assertEquals(5, rowCount2[0][0])
assertEquals(0, rowCount3[0][0])
assertEquals(5, rowCount4[0][0])
assertEquals(5, rowCount5[0][0])
assertEquals(0, rowCount6[0][0])
for (int i = 0; i < partitionTablets.size(); i++) {
for (int j = 0; j < partitionTablets[i].size(); j++) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})"
partitionRowCounts[i][j] = countResult[0][0]
log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}")
}
}
assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]])
assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 1) % partitionTablets[0].size()])
for (int i = 2; i < partitionTablets[0].size(); i++) {
assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()])
}
assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]])
assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 1) % partitionTablets[1].size()])
for (int i = 2; i < partitionTablets[1].size(); i++) {
assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()])
}
// load third time
streamLoad {
@ -212,18 +259,29 @@ suite("test_load_to_single_tablet", "p0") {
}
sql "sync"
totalCount = sql "select count() from ${tableName}"
rowCount1 = sql "select count() from ${tableName} tablet(${tablet1})"
rowCount2 = sql "select count() from ${tableName} tablet(${tablet2})"
rowCount3 = sql "select count() from ${tableName} tablet(${tablet3})"
rowCount4 = sql "select count() from ${tableName} tablet(${tablet4})"
rowCount5 = sql "select count() from ${tableName} tablet(${tablet5})"
rowCount6 = sql "select count() from ${tableName} tablet(${tablet6})"
assertEquals(30, totalCount[0][0])
assertEquals(5, rowCount1[0][0])
assertEquals(5, rowCount2[0][0])
assertEquals(5, rowCount3[0][0])
assertEquals(5, rowCount4[0][0])
assertEquals(5, rowCount5[0][0])
assertEquals(5, rowCount6[0][0])
for (int i = 0; i < partitionTablets.size(); i++) {
for (int j = 0; j < partitionTablets[i].size(); j++) {
def countResult = sql "select count() from ${tableName} tablet(${partitionTablets[i][j]})"
partitionRowCounts[i][j] = countResult[0][0]
log.info("tablet: ${partitionTablets[i][j]}, rowCount: ${partitionRowCounts[i][j]}")
}
}
assertEquals(5, partitionRowCounts[0][partitionBeginIdx[0]])
assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 1) % partitionTablets[0].size()])
assertEquals(5, partitionRowCounts[0][(partitionBeginIdx[0] + 2) % partitionTablets[0].size()])
for (int i = 3; i < partitionTablets[0].size(); i++) {
assertEquals(0, partitionRowCounts[0][(partitionBeginIdx[0] + i) % partitionTablets[0].size()])
}
assertEquals(5, partitionRowCounts[1][partitionBeginIdx[1]])
assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 1) % partitionTablets[1].size()])
assertEquals(5, partitionRowCounts[1][(partitionBeginIdx[1] + 2) % partitionTablets[1].size()])
for (int i = 3; i < partitionTablets[1].size(); i++) {
assertEquals(0, partitionRowCounts[1][(partitionBeginIdx[1] + i) % partitionTablets[1].size()])
}
}