From b2e3ecb81d1b1b0242f4e167152baef14ad679f6 Mon Sep 17 00:00:00 2001 From: qiye Date: Mon, 16 Oct 2023 16:43:25 +0800 Subject: [PATCH] [opt](load)change `load_to_single_tablet` tablet search algorithm from random to round-robin (#25256) At present, `load_to_singlt_tablet` import implementation refers to simple random number remainder, which cannot achieve true averaging. This will lead to uneven disk IO and uneven use of cluster resources. To solve this problem, we are preparing to implement round-robin for each partition tablet imported each time, in order to achieve average load to each tablet. When generating the load query plan, the tablet index record currently imported is passed to BE. Add a deamon task in FE to regularly clean up the `loadTabletRecordMap`. The map will get the bucket_number of the partition and update the `load_tablet_index` when `getCurrentLoadTabletIndex`. --- be/src/exec/tablet_info.cpp | 21 +- be/src/exec/tablet_info.h | 4 +- .../java/org/apache/doris/catalog/Env.java | 8 + .../apache/doris/planner/OlapTableSink.java | 13 + .../planner/SingleTabletLoadRecorderMgr.java | 112 +++++++ gensrc/thrift/Descriptors.thrift | 2 + .../test_load_to_single_tablet.json | 10 + .../test_load_to_single_tablet.groovy | 300 ++++++++++++++++++ 8 files changed, 464 insertions(+), 6 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java create mode 100644 regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json create mode 100644 regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 71ca504d3a..3abeecbfe2 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -324,11 +324,18 @@ Status VOlapTablePartitionParam::init() { } } if (_distributed_slot_locs.empty()) { - _compute_tablet_index = [](BlockRow* key, int64_t num_buckets) -> uint32_t { - return butil::fast_rand() % num_buckets; + _compute_tablet_index = [](BlockRow* key, + const VOlapTablePartition& partition) -> uint32_t { + if (partition.load_tablet_idx == -1) { + // load_to_single_tablet = false, just do random + return butil::fast_rand() % partition.num_buckets; + } + // load_to_single_tablet = ture, do round-robin + return partition.load_tablet_idx % partition.num_buckets; }; } else { - _compute_tablet_index = [this](BlockRow* key, int64_t num_buckets) -> uint32_t { + _compute_tablet_index = [this](BlockRow* key, + const VOlapTablePartition& partition) -> uint32_t { uint32_t hash_val = 0; for (int i = 0; i < _distributed_slot_locs.size(); ++i) { auto slot_desc = _slots[_distributed_slot_locs[i]]; @@ -341,7 +348,7 @@ Status VOlapTablePartitionParam::init() { hash_val = HashUtil::zlib_crc_hash_null(hash_val); } } - return hash_val % num_buckets; + return hash_val % partition.num_buckets; }; } @@ -399,7 +406,7 @@ bool VOlapTablePartitionParam::_part_contains(VOlapTablePartition* part, uint32_t VOlapTablePartitionParam::find_tablet(BlockRow* block_row, const VOlapTablePartition& partition) const { - return _compute_tablet_index(block_row, partition.num_buckets); + return _compute_tablet_index(block_row, partition); } Status VOlapTablePartitionParam::_create_partition_keys(const std::vector& t_exprs, @@ -416,6 +423,10 @@ Status VOlapTablePartitionParam::generate_partition_from(const TOlapTablePartiti part_result = _obj_pool.add(new VOlapTablePartition(&_partition_block)); part_result->id = t_part.id; part_result->is_mutable = t_part.is_mutable; + // only load_to_single_tablet = true will set load_tablet_idx + if (t_part.__isset.load_tablet_idx) { + part_result->load_tablet_idx = t_part.load_tablet_idx; + } if (!_is_in_partition) { if (t_part.__isset.start_keys) { diff --git a/be/src/exec/tablet_info.h b/be/src/exec/tablet_info.h index 3e6ab7b94b..ec12dcbfcd 100644 --- a/be/src/exec/tablet_info.h +++ b/be/src/exec/tablet_info.h @@ -124,6 +124,8 @@ struct VOlapTablePartition { int64_t num_buckets = 0; std::vector indexes; bool is_mutable; + // -1 indicates load_to_single_tablet = false + int64_t load_tablet_idx = -1; VOlapTablePartition(vectorized::Block* partition_block) : start_key {partition_block, -1}, end_key {partition_block, -1} {} @@ -191,7 +193,7 @@ private: Status _create_partition_key(const TExprNode& t_expr, BlockRow* part_key, uint16_t pos); - std::function _compute_tablet_index; + std::function _compute_tablet_index; // check if this partition contain this key bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 35c8c0e8e1..4ac7748993 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -207,6 +207,7 @@ import org.apache.doris.persist.TruncateTableInfo; import org.apache.doris.persist.meta.MetaHeader; import org.apache.doris.persist.meta.MetaReader; import org.apache.doris.persist.meta.MetaWriter; +import org.apache.doris.planner.SingleTabletLoadRecorderMgr; import org.apache.doris.plugin.PluginInfo; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.policy.PolicyMgr; @@ -327,6 +328,7 @@ public class Env { private LoadManager loadManager; private ProgressManager progressManager; private StreamLoadRecordMgr streamLoadRecordMgr; + private SingleTabletLoadRecorderMgr singleTabletLoadRecorderMgr; private RoutineLoadManager routineLoadManager; private SqlBlockRuleMgr sqlBlockRuleMgr; private ExportMgr exportMgr; @@ -689,6 +691,7 @@ public class Env { this.progressManager = new ProgressManager(); this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager", Config.fetch_stream_load_record_interval_second * 1000L); + this.singleTabletLoadRecorderMgr = new SingleTabletLoadRecorderMgr(); this.loadEtlChecker = new LoadEtlChecker(loadManager); this.loadLoadingChecker = new LoadLoadingChecker(loadManager); this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager); @@ -1554,6 +1557,7 @@ public class Env { cooldownConfHandler.start(); } streamLoadRecordMgr.start(); + singleTabletLoadRecorderMgr.start(); getInternalCatalog().getIcebergTableCreationRecordMgr().start(); new InternalSchemaInitializer().start(); if (Config.enable_hms_events_incremental_sync) { @@ -3778,6 +3782,10 @@ public class Env { return streamLoadRecordMgr; } + public SingleTabletLoadRecorderMgr getSingleTabletLoadRecorderMgr() { + return singleTabletLoadRecorderMgr; + } + public IcebergTableCreationRecordMgr getIcebergTableCreationRecordMgr() { return getInternalCatalog().getIcebergTableCreationRecordMgr(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 6df438bb25..223f6b45e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -109,6 +109,8 @@ public class OlapTableSink extends DataSink { private boolean isStrictMode = false; + private boolean loadToSingleTablet; + public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds, boolean singleReplicaLoad) { this.dstTable = dstTable; @@ -131,6 +133,7 @@ public class OlapTableSink extends DataSink { "if load_to_single_tablet set to true," + " the olap table must be with random distribution"); } tSink.setLoadToSingleTablet(loadToSingleTablet); + this.loadToSingleTablet = loadToSingleTablet; tDataSink = new TDataSink(getDataSinkType()); tDataSink.setOlapTableSink(tSink); @@ -330,6 +333,11 @@ public class OlapTableSink extends DataSink { tPartition.setNumBuckets(index.getTablets().size()); } tPartition.setIsMutable(table.getPartitionInfo().getIsMutable(partitionId)); + if (loadToSingleTablet) { + int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() + .getCurrentLoadTabletIndex(dbId, table.getId(), partitionId); + tPartition.setLoadTabletIdx(tabletIndex); + } partitionParam.addToPartitions(tPartition); DistributionInfo distInfo = partition.getDistributionInfo(); @@ -384,6 +392,11 @@ public class OlapTableSink extends DataSink { index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList())))); tPartition.setNumBuckets(index.getTablets().size()); } + if (loadToSingleTablet) { + int tabletIndex = Env.getCurrentEnv().getSingleTabletLoadRecorderMgr() + .getCurrentLoadTabletIndex(dbId, table.getId(), partition.getId()); + tPartition.setLoadTabletIdx(tabletIndex); + } partitionParam.addToPartitions(tPartition); partitionParam.setDistributedColumns(getDistColumns(partition.getDistributionInfo())); partitionParam.setEnableAutomaticPartition(false); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java new file mode 100644 index 0000000000..89f21c47dc --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleTabletLoadRecorderMgr.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndex; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.MasterDaemon; + +import lombok.Getter; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; + +public class SingleTabletLoadRecorderMgr extends MasterDaemon { + private static final Logger LOG = LogManager.getLogger(SingleTabletLoadRecorderMgr.class); + private static final long EXPIRY_TIME_INTERVAL_MS = 86400000; // 1 * 24 * 60 * 60 * 1000, 1 days + + // < -> load_tablet_record> + // 0 =< load_tablet_index < number_buckets + private final ConcurrentHashMap, TabletUpdateRecord> loadTabletRecordMap = + new ConcurrentHashMap<>(); + + public SingleTabletLoadRecorderMgr() { + super("single_tablet_load_recorder", EXPIRY_TIME_INTERVAL_MS); + } + + @Override + protected void runAfterCatalogReady() { + long expiryTime = System.currentTimeMillis() - EXPIRY_TIME_INTERVAL_MS; + loadTabletRecordMap.entrySet().removeIf(entry -> + entry.getValue().getUpdateTimestamp() < expiryTime + ); + LOG.info("Remove expired load tablet record successfully."); + } + + public int getCurrentLoadTabletIndex(long dbId, long tableId, long partitionId) throws UserException { + Triple key = Triple.of(dbId, tableId, partitionId); + TabletUpdateRecord record = loadTabletRecordMap.get(key); + int numBuckets = -1; + if (record == null) { + numBuckets = getNumBuckets(dbId, tableId, partitionId); + } + return createOrUpdateLoadTabletRecord(key, numBuckets); + } + + private int getNumBuckets(long dbId, long tableId, long partitionId) throws UserException { + OlapTable olapTable = (OlapTable) Env.getCurrentInternalCatalog().getDb(dbId) + .flatMap(db -> db.getTable(tableId)).filter(t -> t.getType() == TableIf.TableType.OLAP) + .orElse(null); + if (olapTable == null) { + throw new UserException("Olap table[" + dbId + "." + tableId + "] is not exist."); + } + return olapTable.getPartition(partitionId) + .getMaterializedIndices(MaterializedIndex.IndexExtState.ALL) + .get(0).getTablets().size(); + } + + private int createOrUpdateLoadTabletRecord(Triple key, int numBuckets) { + TabletUpdateRecord record = loadTabletRecordMap.compute(key, (k, existingRecord) -> { + if (existingRecord == null) { + return new TabletUpdateRecord(0, numBuckets); + } else { + existingRecord.updateRecord(); + return existingRecord; + } + }); + return record.getTabletIndex(); + } + + static class TabletUpdateRecord { + @Getter + // 0 =< load_tablet_index < number_buckets + int tabletIndex; + int numBuckets; + @Getter + long updateTimestamp = System.currentTimeMillis(); + + TabletUpdateRecord(int tabletIndex, int numBuckets) { + this.tabletIndex = tabletIndex; + this.numBuckets = numBuckets; + } + + public synchronized void updateRecord() { + this.tabletIndex = this.tabletIndex + 1 >= numBuckets ? 0 : this.tabletIndex + 1; + // To reduce the compute time cost, only update timestamp when index is 0 + if (this.tabletIndex == 0) { + this.updateTimestamp = System.currentTimeMillis(); + } + } + + } +} diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index fa391febda..a5ae774bd4 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -176,6 +176,8 @@ struct TOlapTablePartition { 9: optional bool is_mutable = true // only used in List Partition 10: optional bool is_default_partition; + // only used in load_to_single_tablet + 11: optional i64 load_tablet_idx } struct TOlapTablePartitionParam { diff --git a/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json new file mode 100644 index 0000000000..be1ebe5a86 --- /dev/null +++ b/regression-test/data/load_p0/stream_load/test_load_to_single_tablet.json @@ -0,0 +1,10 @@ +{"k1":"2023-10-11", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1} +{"k1":"2023-10-11", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2} +{"k1":"2023-10-11", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3} +{"k1":"2023-10-11", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4} +{"k1":"2023-10-11", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5} +{"k1":"2023-10-12", "k2":"json love anny", "k3":"json", "k4":"anny","k5":1} +{"k1":"2023-10-12", "k2":"andy hate banana", "k3":"andy", "k4":"banana","k5":2} +{"k1":"2023-10-12", "k2":"liam love apple", "k3":"liam", "k4":"apple","k5":3} +{"k1":"2023-10-12", "k2":"tom hate apple", "k3":"tom", "k4":"apple","k5":4} +{"k1":"2023-10-12", "k2":"lisa love pear", "k3":"lisa", "k4":"pear","k5":5} diff --git a/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy new file mode 100644 index 0000000000..fbdf903f67 --- /dev/null +++ b/regression-test/suites/load_p0/stream_load/test_load_to_single_tablet.groovy @@ -0,0 +1,300 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.json.JsonSlurper + +/** + * @Params url is "/xxx" + * @Return response body + */ +def http_get(url) { + def conn = new URL(url).openConnection() + conn.setRequestMethod("GET") + //token for root + return conn.getInputStream().getText() +} + +suite("test_load_to_single_tablet", "p0") { + sql "show tables" + + def tableName = "test_load_to_single_tablet" + + // test unpartitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` text NULL, + `k3` char(50) NULL, + `k4` varchar(200) NULL, + `k5` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY RANDOM BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // load first time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + + sql "sync" + def totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 10) + def res = sql "show tablets from ${tableName}" + def tabletMetaUrl1 = res[0][17] + def tabletMetaUrl2 = res[1][17] + def tabletMetaUrl3 = res[2][17] + def tabletMetaRes1 = http_get(tabletMetaUrl1) + def tabletMetaRes2 = http_get(tabletMetaUrl2) + def tabletMetaRes3 = http_get(tabletMetaUrl3) + + def obj1 = new JsonSlurper().parseText(tabletMetaRes1) + def obj2 = new JsonSlurper().parseText(tabletMetaRes2) + def obj3 = new JsonSlurper().parseText(tabletMetaRes3) + def rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + def rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + def rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 0) + assertEquals(rowCount3, 0) + + // load second time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 20) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 10) + assertEquals(rowCount3, 0) + + // load third time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 30) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows + assertEquals(rowCount1, 10) + assertEquals(rowCount2, 10) + assertEquals(rowCount3, 10) + + // test partitioned table + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` date NULL, + `k2` text NULL, + `k3` char(50) NULL, + `k4` varchar(200) NULL, + `k5` int(11) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + PARTITION BY RANGE(`k1`) + (PARTITION p20231011 VALUES [('2023-10-11'), ('2023-10-12')), + PARTITION p20231012 VALUES [('2023-10-12'), ('2023-10-13')), + PARTITION p20231013 VALUES [('2023-10-13'), ('2023-10-14')), + PARTITION p20231014 VALUES [('2023-10-14'), ('2023-10-15'))) + DISTRIBUTED BY RANDOM BUCKETS 10 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + // load first time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 10) + res = sql "show tablets from ${tableName} partitions(p20231011, p20231012)" + tabletMetaUrl1 = res[0][17] + tabletMetaUrl2 = res[1][17] + tabletMetaUrl3 = res[2][17] + tabletMetaUrl4 = res[10][17] + tabletMetaUrl5 = res[11][17] + tabletMetaUrl6 = res[12][17] + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + def rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + def rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + def rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 0) + assertEquals(rowCount3, 0) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 0) + assertEquals(rowCount6, 0) + + // load second time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 20) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + obj4.rs_metas[2].num_rows + rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + obj5.rs_metas[2].num_rows + rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + obj6.rs_metas[2].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 5) + assertEquals(rowCount3, 0) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 5) + assertEquals(rowCount6, 0) + + // load third time + streamLoad { + table "${tableName}" + + set 'format', 'json' + set 'read_json_by_line', 'true' + set 'load_to_single_tablet', 'true' + + file 'test_load_to_single_tablet.json' + time 10000 // limit inflight 10s + } + sql "sync" + totalCount = sql "select count() from ${tableName}" + assertEquals(totalCount[0][0], 30) + tabletMetaRes1 = http_get(tabletMetaUrl1) + tabletMetaRes2 = http_get(tabletMetaUrl2) + tabletMetaRes3 = http_get(tabletMetaUrl3) + tabletMetaRes4 = http_get(tabletMetaUrl4) + tabletMetaRes5 = http_get(tabletMetaUrl5) + tabletMetaRes6 = http_get(tabletMetaUrl6) + + obj1 = new JsonSlurper().parseText(tabletMetaRes1) + obj2 = new JsonSlurper().parseText(tabletMetaRes2) + obj3 = new JsonSlurper().parseText(tabletMetaRes3) + obj4 = new JsonSlurper().parseText(tabletMetaRes4) + obj5 = new JsonSlurper().parseText(tabletMetaRes5) + obj6 = new JsonSlurper().parseText(tabletMetaRes6) + + rowCount1 = obj1.rs_metas[0].num_rows + obj1.rs_metas[1].num_rows + obj1.rs_metas[2].num_rows + obj1.rs_metas[3].num_rows + rowCount2 = obj2.rs_metas[0].num_rows + obj2.rs_metas[1].num_rows + obj2.rs_metas[2].num_rows + obj2.rs_metas[3].num_rows + rowCount3 = obj3.rs_metas[0].num_rows + obj3.rs_metas[1].num_rows + obj3.rs_metas[2].num_rows + obj3.rs_metas[3].num_rows + rowCount4 = obj4.rs_metas[0].num_rows + obj4.rs_metas[1].num_rows + obj4.rs_metas[2].num_rows + obj4.rs_metas[3].num_rows + rowCount5 = obj5.rs_metas[0].num_rows + obj5.rs_metas[1].num_rows + obj5.rs_metas[2].num_rows + obj5.rs_metas[3].num_rows + rowCount6 = obj6.rs_metas[0].num_rows + obj6.rs_metas[1].num_rows + obj6.rs_metas[2].num_rows + obj6.rs_metas[3].num_rows + assertEquals(rowCount1, 5) + assertEquals(rowCount2, 5) + assertEquals(rowCount3, 5) + assertEquals(rowCount4, 5) + assertEquals(rowCount5, 5) + assertEquals(rowCount6, 5) +} +