From fbc448520a6107ff277073e4447c55d9545e6dd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B7=9D=E6=B5=81?= <113690007+LHG41278@users.noreply.github.com> Date: Sun, 22 Oct 2023 21:04:43 +0800 Subject: [PATCH] [feature](ColdHeatSeperation) Support to upload cold data to HDFS (#22048) --- .gitignore | 3 + be/src/agent/task_worker_pool.cpp | 18 ++ be/src/olap/storage_policy.cpp | 1 - docs/en/docs/advanced/cold-hot-separation.md | 33 ++- .../docs/advanced/cold-hot-separation.md | 35 ++- .../apache/doris/master/ReportHandler.java | 1 + .../apache/doris/policy/StoragePolicy.java | 40 +-- .../doris/task/PushStoragePolicyTask.java | 12 +- gensrc/thrift/AgentService.thrift | 2 + .../load_colddata_to_hdfs.groovy | 230 ++++++++++++++++++ 10 files changed, 353 insertions(+), 22 deletions(-) create mode 100644 regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy diff --git a/.gitignore b/.gitignore index 84fc3064e6..9a35bbe225 100644 --- a/.gitignore +++ b/.gitignore @@ -122,3 +122,6 @@ lru_cache_test /conf/log4j2-spring.xml /fe/fe-core/src/test/resources/real-help-resource.zip /ui/dist + +# other +compile_commands.json diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 8b4aedd32f..ff22f7923c 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -49,6 +50,7 @@ #include "gutil/strings/numbers.h" #include "gutil/strings/substitute.h" #include "io/fs/file_system.h" +#include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" #include "io/fs/path.h" #include "io/fs/s3_file_system.h" @@ -1203,6 +1205,22 @@ void TaskWorkerPool::_push_storage_policy_worker_thread_callback() { .tag("s3_conf", s3_conf.to_string()); put_storage_resource(resource.id, {std::move(fs), resource.version}); } + } else if (resource.__isset.hdfs_storage_param) { + Status st; + std::shared_ptr fs; + if (existed_resource.fs == nullptr) { + st = io::HdfsFileSystem::create(resource.hdfs_storage_param, "", nullptr, &fs); + } else { + fs = std::static_pointer_cast(existed_resource.fs); + } + if (!st.ok()) { + LOG(WARNING) << "update hdfs resource failed: " << st; + } else { + LOG_INFO("successfully update hdfs resource") + .tag("resource_id", resource.id) + .tag("resource_name", resource.name); + put_storage_resource(resource.id, {std::move(fs), resource.version}); + } } else { LOG(WARNING) << "unknown resource=" << resource; } diff --git a/be/src/olap/storage_policy.cpp b/be/src/olap/storage_policy.cpp index 494c93cec1..61c3093abf 100644 --- a/be/src/olap/storage_policy.cpp +++ b/be/src/olap/storage_policy.cpp @@ -48,7 +48,6 @@ Status get_remote_file_system(int64_t storage_policy_id, return Status::InternalError("could not find resource, resouce_id={}", storage_policy->resource_id); } - DCHECK(atol((*fs)->id().c_str()) == storage_policy->resource_id); DCHECK((*fs)->type() != io::FileSystemType::LOCAL); return Status::OK(); } diff --git a/docs/en/docs/advanced/cold-hot-separation.md b/docs/en/docs/advanced/cold-hot-separation.md index 411dc38f2a..64faca74e9 100644 --- a/docs/en/docs/advanced/cold-hot-separation.md +++ b/docs/en/docs/advanced/cold-hot-separation.md @@ -58,7 +58,7 @@ In addition, fe configuration needs to be added: `enable_storage_policy=true` Note: This property will not be synchronized by CCR. If this table is copied by CCR, that is, PROPERTIES contains `is_being_synced = true`, this property will be erased in this table. -For example: +This is an instance that how to create S3 RESOURCE: ``` CREATE RESOURCE "remote_s3" @@ -94,6 +94,36 @@ PROPERTIES( "storage_policy" = "test_policy" ); ``` +and how to create HDFS RESOURCE: +``` +CREATE RESOURCE "remote_hdfs" PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="fs_host:default_fs_port", + "hadoop.username"="hive", + "hadoop.password"="hive", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_prot", + "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + + CREATE STORAGE POLICY test_policy PROPERTIES ( + "storage_resource" = "remote_hdfs", + "cooldown_ttl" = "300" + ) + + CREATE TABLE IF NOT EXISTS create_table_use_created_policy ( + k1 BIGINT, + k2 LARGEINT, + v1 VARCHAR(2048) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "storage_policy" = "test_policy" + ); +``` Or for an existing table, associate the storage policy ``` ALTER TABLE create_table_not_have_policy set ("storage_policy" = "test_policy"); @@ -177,3 +207,4 @@ PROPERTIES "use_path_style" = "true" ); ``` + diff --git a/docs/zh-CN/docs/advanced/cold-hot-separation.md b/docs/zh-CN/docs/advanced/cold-hot-separation.md index b1c662080c..c81c45dc71 100644 --- a/docs/zh-CN/docs/advanced/cold-hot-separation.md +++ b/docs/zh-CN/docs/advanced/cold-hot-separation.md @@ -35,7 +35,7 @@ under the License. 4. 基于普通云盘做高可用,需要实现多副本,某副本异常要做副本迁移。而将数据放到对象存储上则不存在此类问题,因为对象存储是共享的。 ## 解决方案 -在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3上。 +在Partition级别上设置freeze time,表示多久这个Partition会被freeze,并且定义freeze之后存储的remote storage的位置。在be上daemon线程会周期性的判断表是否需要freeze,若freeze后会将数据上传到s3和hdfs上。 冷热分层支持所有doris功能,只是把部分数据放到对象存储上,以节省成本,不牺牲功能。因此有如下特点: @@ -57,7 +57,7 @@ under the License. 注意:这个属性不会被CCR同步,如果这个表是被CCR复制而来的,即PROPERTIES中包含`is_being_synced = true`时,这个属性将会在这个表中被擦除。 -例如: +下面演示如何创建S3 RESOURCE: ``` CREATE RESOURCE "remote_s3" @@ -93,6 +93,36 @@ PROPERTIES( "storage_policy" = "test_policy" ); ``` +以及如何创建 HDFS RESOURCE: +``` +CREATE RESOURCE "remote_hdfs" PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="fs_host:default_fs_port", + "hadoop.username"="hive", + "hadoop.password"="hive", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "nn1_host:rpc_port", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "nn2_host:rpc_port", + "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + + CREATE STORAGE POLICY test_policy PROPERTIES ( + "storage_resource" = "remote_hdfs", + "cooldown_ttl" = "300" + ) + + CREATE TABLE IF NOT EXISTS create_table_use_created_policy ( + k1 BIGINT, + k2 LARGEINT, + v1 VARCHAR(2048) + ) + UNIQUE KEY(k1) + DISTRIBUTED BY HASH (k1) BUCKETS 3 + PROPERTIES( + "storage_policy" = "test_policy" + ); +``` 或者对一个已存在的表,关联storage policy ``` ALTER TABLE create_table_not_have_policy set ("storage_policy" = "test_policy"); @@ -104,6 +134,7 @@ ALTER TABLE create_table_partition MODIFY PARTITION (*) SET("storage_policy"="te **注意**,如果用户在建表时给整张table和部分partition指定了不同的storage policy,partition设置的storage policy会被无视,整张表的所有partition都会使用table的policy. 如果您需要让某个partition的policy和别的不同,则可以使用上文中对一个已存在的partition,关联storage policy的方式修改. 具体可以参考docs目录下[resource](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-RESOURCE.md)、 [policy](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md)、 [create table](../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.md)、 [alter table](../sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.md)等文档,里面有详细介绍 + ### 一些限制 - 单表或单partition只能关联一个storage policy,关联后不能drop掉storage policy,需要先解除二者的关联。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java index 4bfb55e3a8..efca6c7f95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java @@ -306,6 +306,7 @@ public class ReportHandler extends Daemon { // do the diff. find out (intersection) / (be - meta) / (meta - be) List policiesInFe = Env.getCurrentEnv().getPolicyMgr().getCopiedPoliciesByType(PolicyTypeEnum.STORAGE); List resourcesInFe = Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.S3); + resourcesInFe.addAll(Env.getCurrentEnv().getResourceMgr().getResource(ResourceType.HDFS)); List resourceToPush = new ArrayList<>(); List policyToPush = new ArrayList<>(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java index 4f1cbd96b7..b86b3dc490 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java @@ -19,6 +19,7 @@ package org.apache.doris.policy; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Resource.ReferenceType; import org.apache.doris.catalog.ScalarType; @@ -177,29 +178,38 @@ public class StoragePolicy extends Policy { this.cooldownTtl = getSecondsByCooldownTtl(props.get(COOLDOWN_TTL)); } - checkIsS3ResourceAndExist(this.storageResource); + checkResourceIsExist(this.storageResource); if (!addResourceReference() && !ifNotExists) { - throw new AnalysisException("this policy has been added to s3 resource once, policy has been created."); + throw new AnalysisException("this policy has been added to s3 or hdfs resource, policy has been created."); } } - private static Resource checkIsS3ResourceAndExist(final String storageResource) throws AnalysisException { - // check storage_resource type is S3, current just support S3 + private static Resource checkResourceIsExist(final String storageResource) throws AnalysisException { Resource resource = Optional.ofNullable(Env.getCurrentEnv().getResourceMgr().getResource(storageResource)) .orElseThrow(() -> new AnalysisException("storage resource doesn't exist: " + storageResource)); - if (resource.getType() != Resource.ResourceType.S3) { - throw new AnalysisException("current storage policy just support resource type S3_COOLDOWN"); - } Map properties = resource.getCopiedProperties(); - if (!properties.containsKey(S3Properties.ROOT_PATH)) { - throw new AnalysisException(String.format( - "Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource)); - } - if (!properties.containsKey(S3Properties.BUCKET)) { - throw new AnalysisException(String.format( - "Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource)); + switch (resource.getType()) { + case S3: + if (!properties.containsKey(S3Properties.ROOT_PATH)) { + throw new AnalysisException(String.format( + "Missing [%s] in '%s' resource", S3Properties.ROOT_PATH, storageResource)); + } + if (!properties.containsKey(S3Properties.BUCKET)) { + throw new AnalysisException(String.format( + "Missing [%s] in '%s' resource", S3Properties.BUCKET, storageResource)); + } + break; + case HDFS: + if (!properties.containsKey(HdfsResource.HADOOP_FS_NAME)) { + throw new AnalysisException(String.format( + "Missing [%s] in '%s' resource", HdfsResource.HADOOP_FS_NAME, storageResource)); + } + break; + default: + throw new AnalysisException( + "current storage policy just support resource type S3_COOLDOWN or HDFS_COOLDOWN"); } return resource; } @@ -343,7 +353,7 @@ public class StoragePolicy extends Policy { String storageResource = properties.get(STORAGE_RESOURCE); if (storageResource != null) { - checkIsS3ResourceAndExist(storageResource); + checkResourceIsExist(storageResource); } if (this.policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && this.storageResource == null && storageResource == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java index e422a409e1..4f1bec5d87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/PushStoragePolicyTask.java @@ -18,6 +18,7 @@ package org.apache.doris.task; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.HdfsResource; import org.apache.doris.catalog.Resource; import org.apache.doris.catalog.Resource.ResourceType; import org.apache.doris.datasource.property.constants.S3Properties; @@ -62,8 +63,9 @@ public class PushStoragePolicyTask extends AgentTask { StoragePolicy storagePolicy = (StoragePolicy) p; String resourceName = storagePolicy.getStorageResource(); Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(resourceName); - if (resource == null || resource.getType() != ResourceType.S3) { - LOG.warn("can't find s3 resource by name {}", resourceName); + if (resource == null || (resource.getType() != ResourceType.S3 + && resource.getType() != ResourceType.HDFS)) { + LOG.warn("can't find s3 resource or hdfs resource by name {}", resourceName); return; } item.setResourceId(resource.getId()); @@ -85,7 +87,11 @@ public class PushStoragePolicyTask extends AgentTask { item.setId(r.getId()); item.setName(r.getName()); item.setVersion(r.getVersion()); - item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties())); + if (r.getType() == ResourceType.S3) { + item.setS3StorageParam(S3Properties.getS3TStorageParam(r.getCopiedProperties())); + } else if (r.getType() == ResourceType.HDFS) { + item.setHdfsStorageParam(HdfsResource.generateHdfsParam(r.getCopiedProperties())); + } r.readUnlock(); tStorageResources.add(item); }); diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index a61c951256..4b4c260b47 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -60,6 +60,7 @@ enum TTabletType { TABLET_TYPE_MEMORY = 1 } + struct TS3StorageParam { 1: optional string endpoint 2: optional string region @@ -87,6 +88,7 @@ struct TStorageResource { 2: optional string name 3: optional i64 version // alter version 4: optional TS3StorageParam s3_storage_param + 5: optional PlanNodes.THdfsParams hdfs_storage_param // more storage resource type } diff --git a/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy new file mode 100644 index 0000000000..14c5939dfc --- /dev/null +++ b/regression-test/suites/cold_heat_separation_p2/load_colddata_to_hdfs.groovy @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +import groovy.json.JsonSlurper +import org.codehaus.groovy.runtime.IOGroovyMethods + +suite("load_colddata_to_hdfs") { + if (!enableHdfs()) { + logger.info("skip this case because hdfs is not enabled"); + } + + def fetchBeHttp = { check_func, meta_url -> + def i = meta_url.indexOf("/api") + String endPoint = meta_url.substring(0, i) + String metaUri = meta_url.substring(i) + i = endPoint.lastIndexOf('/') + endPoint = endPoint.substring(i + 1) + httpTest { + endpoint endPoint + uri metaUri + op "get" + check check_func + } + } + // data_sizes is one arrayList, t is tablet + def fetchDataSize = { data_sizes, t -> + def tabletId = t[0] + String meta_url = t[17] + def clos = { respCode, body -> + logger.info("test ttl expired resp Code {}", "${respCode}".toString()) + assertEquals("${respCode}".toString(), "200") + String out = "${body}".toString() + def obj = new JsonSlurper().parseText(out) + data_sizes[0] = obj.local_data_size + data_sizes[1] = obj.remote_data_size + } + fetchBeHttp(clos, meta_url.replace("header", "data_size")) + } + // used as passing out parameter to fetchDataSize + List sizes = [-1, -1] + def tableName = "lineitem2" + sql """ DROP TABLE IF EXISTS ${tableName} """ + def stream_load_one_part = { partnum -> + streamLoad { + table tableName + // a default db 'regression_test' is specified in + // ${DORIS_HOME}/conf/regression-conf.groovy + + // default label is UUID: + // set 'label' UUID.randomUUID().toString() + + // default column_separator is specify in doris fe config, usually is '\t'. + // this line change to ',' + set 'column_separator', '|' + set 'compress_type', 'GZ' + + + // relate to ${DORIS_HOME}/regression-test/data/demo/streamload_input.csv. + // also, you can stream load a http stream, e.g. http://xxx/some.csv + file """${getS3Url()}/regression/tpch/sf1/lineitem.csv.split${partnum}.gz""" + time 10000 // limit inflight 10s + + // stream load action will check result, include Success status, and NumberTotalRows == NumberLoadedRows + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def load_lineitem_table = { + stream_load_one_part("00") + stream_load_one_part("01") + def tablets = sql """ + SHOW TABLETS FROM ${tableName} + """ + while (tablets[0][8] == "0") { + log.info( "test local size is zero, sleep 10s") + sleep(10000) + tablets = sql """ + SHOW TABLETS FROM ${tableName} + """ + } + } + + def check_storage_policy_exist = { name-> + def polices = sql""" + show storage policy; + """ + for (p in polices) { + if (name == p[0]) { + return true; + } + } + return false; + } + + def resource_name = "test_table_with_data_resource" + def policy_name= "test_table_with_data_policy" + + if (check_storage_policy_exist(policy_name)) { + sql """ + DROP STORAGE POLICY ${policy_name} + """ + } + + def has_resouce = sql """ + SHOW RESOURCES WHERE NAME = "${resource_name}"; + """ + if (has_resouce.size() > 0) { + sql """ + DROP RESOURCE ${resource_name} + """ + } + + sql """ + CREATE RESOURCE IF NOT EXISTS ${resource_name} + PROPERTIES ( + "type"="hdfs", + "fs.defaultFS"="127.0.0.1:8120", + "hadoop.username"="hive", + "hadoop.password"="hive", + "dfs.nameservices" = "my_ha", + "dfs.ha.namenodes.my_ha" = "my_namenode1, my_namenode2", + "dfs.namenode.rpc-address.my_ha.my_namenode1" = "127.0.0.1:10000", + "dfs.namenode.rpc-address.my_ha.my_namenode2" = "127.0.0.1:10000", + "dfs.client.failover.proxy.provider" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + ); + """ + + sql """ + CREATE STORAGE POLICY IF NOT EXISTS ${policy_name} + PROPERTIES( + "storage_resource" = "${resource_name}", + "cooldown_ttl" = "300" + ) + """ + + // test one replica + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + L_ORDERKEY INTEGER NOT NULL, + L_PARTKEY INTEGER NOT NULL, + L_SUPPKEY INTEGER NOT NULL, + L_LINENUMBER INTEGER NOT NULL, + L_QUANTITY DECIMAL(15,2) NOT NULL, + L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL, + L_DISCOUNT DECIMAL(15,2) NOT NULL, + L_TAX DECIMAL(15,2) NOT NULL, + L_RETURNFLAG CHAR(1) NOT NULL, + L_LINESTATUS CHAR(1) NOT NULL, + L_SHIPDATE DATE NOT NULL, + L_COMMITDATE DATE NOT NULL, + L_RECEIPTDATE DATE NOT NULL, + L_SHIPINSTRUCT CHAR(25) NOT NULL, + L_SHIPMODE CHAR(10) NOT NULL, + L_COMMENT VARCHAR(44) NOT NULL + ) + DUPLICATE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER) + DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3 + PROPERTIES ( + "replication_num" = "1", + "storage_policy" = "${policy_name}" + ) + """ + + load_lineitem_table() + + // show tablets from table, 获取第一个tablet的 LocalDataSize1 + tablets = sql """ + SHOW TABLETS FROM ${tableName} + """ + log.info( "test tablets not empty") + assertTrue(tablets.size() > 0) + fetchDataSize(sizes, tablets[0]) + def LocalDataSize1 = sizes[0] + + // 等待10min,show tablets from table, 预期RemoteDataSize 不为0,且等于LocalDataSize1,预期LocalDataSize 为0 + sleep(600000) + + + tablets = sql """ + SHOW TABLETS FROM ${tableName} + """ + log.info( "test tablets not empty") + fetchDataSize(sizes, tablets[0]) + while (sizes[1] == 0) { + log.info( "test remote size is zero, sleep 10s") + sleep(10000) + tablets = sql """ + SHOW TABLETS FROM ${tableName} + """ + fetchDataSize(sizes, tablets[0]) + } + assertTrue(tablets.size() > 0) + log.info( "test remote size not zero") + assertEquals(LocalDataSize1, sizes[1]) + log.info( "test local size is zero") + assertEquals(0, sizes[0]) + + + sql """ + DROP TABLE ${tableName} + """ + + +}