From 44ba9e102cd93947e033ddfdaf3478eb68560ec7 Mon Sep 17 00:00:00 2001 From: wuwenchi Date: Wed, 17 Jan 2024 17:43:05 +0800 Subject: [PATCH] [feature](statistics)support statistics for iceberg/paimon/hudi table (#29868) --- .../docker-compose/iceberg/iceberg.env | 2 +- .../docker-compose/iceberg/iceberg.yaml.tpl | 2 + .../external/IcebergExternalTable.java | 9 + .../catalog/external/PaimonExternalTable.java | 9 + .../doris/common/proc/CatalogsProcDir.java | 10 +- .../statistics/ExternalAnalysisTask.java | 263 ++++++++++++++++++ .../doris/statistics/HMSAnalysisTask.java | 252 ++--------------- .../doris/statistics/util/StatisticsUtil.java | 10 +- .../doris/statistics/HMSAnalysisTaskTest.java | 4 +- regression-test/conf/regression-conf.groovy | 1 + .../iceberg/test_iceberg_statistics.out | 39 +++ .../paimon/test_paimon_statistics.out | 21 ++ .../hive/test_hive_hudi_statistics.out | 16 ++ .../pipeline/p0/conf/regression-conf.groovy | 1 + .../tpch-sf100/conf/regression-conf.groovy | 1 + .../plugins/plugins_get_ids_from_proc.groovy | 62 +++++ .../iceberg/test_iceberg_statistics.groovy | 57 ++++ .../paimon/test_paimon_catalog.groovy | 3 + .../paimon/test_paimon_statistics.groovy | 47 ++++ .../external_table_p0/test_catalog_ddl.groovy | 2 + .../hive/test_hive_hudi_statistics.groovy | 47 ++++ 21 files changed, 621 insertions(+), 237 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java create mode 100644 regression-test/data/external_table_p0/iceberg/test_iceberg_statistics.out create mode 100644 regression-test/data/external_table_p0/paimon/test_paimon_statistics.out create mode 100644 regression-test/data/external_table_p2/hive/test_hive_hudi_statistics.out create mode 100644 regression-test/plugins/plugins_get_ids_from_proc.groovy create mode 100644 regression-test/suites/external_table_p0/iceberg/test_iceberg_statistics.groovy create mode 100644 regression-test/suites/external_table_p0/paimon/test_paimon_statistics.groovy create mode 100644 regression-test/suites/external_table_p2/hive/test_hive_hudi_statistics.groovy diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.env b/docker/thirdparties/docker-compose/iceberg/iceberg.env index 4cc8b42eaf..6bebd49f43 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.env +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.env @@ -21,4 +21,4 @@ SPARK_DRIVER_UI_PORT=8080 SPARK_HISTORY_UI_PORT=10000 REST_CATALOG_PORT=18181 MINIO_UI_PORT=9000 -MINIO_API_PORT=9001 +MINIO_API_PORT=19001 diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl index d7220f2437..bc217c1dd6 100644 --- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl +++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl @@ -58,6 +58,8 @@ services: minio: image: minio/minio container_name: doris--minio + ports: + - ${MINIO_API_PORT}:9000 environment: - MINIO_ROOT_USER=admin - MINIO_ROOT_PASSWORD=password diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java index 7398ff19c9..be99e26de6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalTable.java @@ -23,7 +23,10 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.iceberg.IcebergExternalCatalog; +import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.statistics.ColumnStatistic; +import org.apache.doris.statistics.ExternalAnalysisTask; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TIcebergTable; @@ -149,4 +152,10 @@ public class IcebergExternalTable extends ExternalTable { () -> StatisticsUtil.getIcebergColumnStats(colName, ((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name))); } + + @Override + public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { + makeSureInitialized(); + return new ExternalAnalysisTask(info); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java index c8ea253671..b517265df6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalTable.java @@ -21,6 +21,9 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.datasource.paimon.PaimonExternalCatalog; +import org.apache.doris.statistics.AnalysisInfo; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.ExternalAnalysisTask; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -154,4 +157,10 @@ public class PaimonExternalTable extends ExternalTable { + getPaimonCatalogType()); } } + + @Override + public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) { + makeSureInitialized(); + return new ExternalAnalysisTask(info); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CatalogsProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CatalogsProcDir.java index 854b4dddc7..e6163645c2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/CatalogsProcDir.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/CatalogsProcDir.java @@ -27,6 +27,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Collections; @@ -37,6 +38,7 @@ import java.util.List; * show all catalogs' info */ public class CatalogsProcDir implements ProcDirInterface { + private static final Logger LOG = Logger.getLogger(CatalogsProcDir.class); public static final ImmutableList TITLE_NAMES = new ImmutableList.Builder() .add("CatalogIds").add("CatalogName").add("DatabaseNum").add("LastUpdateTime") .build(); @@ -90,7 +92,13 @@ public class CatalogsProcDir implements ProcDirInterface { List catalogInfo = Lists.newArrayList(); catalogInfo.add(catalog.getId()); catalogInfo.add(catalog.getName()); - catalogInfo.add(catalog.getDbNames().size()); + int size = -1; + try { + size = catalog.getDbNames().size(); + } catch (Exception e) { + LOG.warn("failed to get database: ", e); + } + catalogInfo.add(size); catalogInfo.add(TimeUtils.longToTimeString(catalog.getLastUpdateTime())); catalogInfos.add(catalogInfo); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java new file mode 100644 index 0000000000..15848c013d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ExternalAnalysisTask.java @@ -0,0 +1,263 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.external.ExternalTable; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.Pair; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; + +public class ExternalAnalysisTask extends BaseAnalysisTask { + private static final Logger LOG = LogManager.getLogger(ExternalAnalysisTask.class); + + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}"; + private boolean isTableLevelTask; + private boolean isPartitionOnly; + private ExternalTable table; + + // For test + public ExternalAnalysisTask() { + } + + public ExternalAnalysisTask(AnalysisInfo info) { + super(info); + isTableLevelTask = info.externalTableLevelTask; + isPartitionOnly = info.partitionOnly; + table = (ExternalTable) tbl; + } + + public void doExecute() throws Exception { + if (isTableLevelTask) { + getTableStats(); + } else { + getOrdinaryColumnStats(); + } + } + + // For test + protected void setTable(ExternalTable table) { + this.table = table; + } + + /** + * Get table row count + */ + private void getTableStats() { + Map params = buildStatsParams(null); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).get(0); + Env.getCurrentEnv().getAnalysisManager() + .updateTableStatsStatus( + new TableStatsMeta(Long.parseLong(rowCount), info, tbl)); + job.rowCountDone(this); + } + + // Get ordinary column stats + protected void getOrdinaryColumnStats() throws Exception { + StringBuilder sb = new StringBuilder(); + Map params = buildStatsParams("NULL"); + params.put("min", getMinFunction()); + params.put("max", getMaxFunction()); + params.put("dataSizeFunction", getDataSizeFunction(col, false)); + Pair sampleInfo = getSampleInfo(); + params.put("scaleFactor", String.valueOf(sampleInfo.first)); + StringSubstitutor stringSubstitutor; + if (tableSample == null) { + // Do full analyze + LOG.debug("Will do full collection for column {}", col.getName()); + sb.append(COLLECT_COL_STATISTICS); + } else { + // Do sample analyze + LOG.debug("Will do sample collection for column {}", col.getName()); + boolean limitFlag = false; + boolean bucketFlag = false; + // If sample size is too large, use limit to control the sample size. + if (needLimit(sampleInfo.second, sampleInfo.first)) { + limitFlag = true; + long columnSize = 0; + for (Column column : table.getFullSchema()) { + columnSize += column.getDataType().getSlotSize(); + } + double targetRows = (double) sampleInfo.second / columnSize; + // Estimate the new scaleFactor based on the schema. + if (targetRows > StatisticsUtil.getHugeTableSampleRows()) { + params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows()); + params.put("scaleFactor", + String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows())); + } + } + // Single distribution column is not fit for DUJ1 estimator, use linear estimator. + Set distributionColumns = tbl.getDistributionColumnNames(); + if (distributionColumns.size() == 1 && distributionColumns.contains(col.getName().toLowerCase())) { + bucketFlag = true; + sb.append(LINEAR_ANALYZE_TEMPLATE); + params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})"); + params.put("rowCount", "ROUND(count(1) * ${scaleFactor})"); + } else { + sb.append(DUJ1_ANALYZE_TEMPLATE); + params.put("dataSizeFunction", getDataSizeFunction(col, true)); + params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})")); + params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})"); + } + LOG.info("Sample for column [{}]. Scale factor [{}], " + + "limited [{}], is distribute column [{}]", + col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag); + } + stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + runQuery(sql); + } + + protected Map buildStatsParams(String partId) { + Map commonParams = new HashMap<>(); + String id = StatisticsUtil.constructId(tbl.getId(), -1); + if (partId == null) { + commonParams.put("partId", "NULL"); + } else { + id = StatisticsUtil.constructId(id, partId); + commonParams.put("partId", "\'" + partId + "\'"); + } + commonParams.put("internalDB", FeConstants.INTERNAL_DB_NAME); + commonParams.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + commonParams.put("id", id); + commonParams.put("catalogId", String.valueOf(catalog.getId())); + commonParams.put("dbId", String.valueOf(db.getId())); + commonParams.put("tblId", String.valueOf(tbl.getId())); + commonParams.put("indexId", "-1"); + commonParams.put("idxId", "-1"); + commonParams.put("colName", info.colName); + commonParams.put("colId", info.colName); + commonParams.put("catalogName", catalog.getName()); + commonParams.put("dbName", db.getFullName()); + commonParams.put("tblName", tbl.getName()); + commonParams.put("sampleHints", getSampleHint()); + commonParams.put("limit", ""); + commonParams.put("scaleFactor", "1"); + if (col != null) { + commonParams.put("type", col.getType().toString()); + } + commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + return commonParams; + } + + protected String getSampleHint() { + if (tableSample == null) { + return ""; + } + if (tableSample.isPercent()) { + return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue()); + } else { + return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue()); + } + } + + /** + * Get the pair of sample scale factor and the file size going to sample. + * While analyzing, the result of count, null count and data size need to + * multiply this scale factor to get more accurate result. + * @return Pair of sample scale factor and the file size going to sample. + */ + protected Pair getSampleInfo() { + if (tableSample == null) { + return Pair.of(1.0, 0L); + } + long target; + // Get list of all files' size in this HMS table. + List chunkSizes = table.getChunkSizes(); + Collections.shuffle(chunkSizes, new Random(tableSample.getSeek())); + long total = 0; + // Calculate the total size of this HMS table. + for (long size : chunkSizes) { + total += size; + } + if (total == 0) { + return Pair.of(1.0, 0L); + } + // Calculate the sample target size for percent and rows sample. + if (tableSample.isPercent()) { + target = total * tableSample.getSampleValue() / 100; + } else { + int columnSize = 0; + for (Column column : table.getFullSchema()) { + columnSize += column.getDataType().getSlotSize(); + } + target = columnSize * tableSample.getSampleValue(); + } + // Calculate the actual sample size (cumulate). + long cumulate = 0; + for (long size : chunkSizes) { + cumulate += size; + if (cumulate >= target) { + break; + } + } + return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate); + } + + @Override + protected void afterExecution() { + // Table level task doesn't need to sync any value to sync stats, it stores the value in metadata. + // Partition only task doesn't need to refresh cached. + if (isTableLevelTask || isPartitionOnly) { + return; + } + Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); + } + + /** + * If the size to sample is larger than LIMIT_SIZE (1GB) + * and is much larger (1.2*) than the size user want to sample, + * use limit to control the total sample size. + * @param sizeToRead The file size to sample. + * @param factor sizeToRead * factor = Table total size. + * @return True if need to limit. + */ + protected boolean needLimit(long sizeToRead, double factor) { + long total = (long) (sizeToRead * factor); + long target; + if (tableSample.isPercent()) { + target = total * tableSample.getSampleValue() / 100; + } else { + int columnSize = 0; + for (Column column : table.getFullSchema()) { + columnSize += column.getDataType().getSlotSize(); + } + target = columnSize * tableSample.getSampleValue(); + } + if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) { + return true; + } + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index fd0a4c8253..9e8be62282 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -17,12 +17,10 @@ package org.apache.doris.statistics; -import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.external.ExternalTable; import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.FeConstants; -import org.apache.doris.common.Pair; import org.apache.doris.datasource.hive.HiveMetaStoreCache; import org.apache.doris.external.hive.util.HiveUtil; import org.apache.doris.statistics.util.StatisticsUtil; @@ -32,64 +30,36 @@ import org.apache.commons.text.StringSubstitutor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; -public class HMSAnalysisTask extends BaseAnalysisTask { +public class HMSAnalysisTask extends ExternalAnalysisTask { private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class); + private HMSExternalTable hmsExternalTable; - private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount " - + "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}"; - private boolean isTableLevelTask; - private boolean isPartitionOnly; - private HMSExternalTable table; - + // for test public HMSAnalysisTask() { } public HMSAnalysisTask(AnalysisInfo info) { super(info); - isTableLevelTask = info.externalTableLevelTask; - isPartitionOnly = info.partitionOnly; - table = (HMSExternalTable) tbl; + hmsExternalTable = (HMSExternalTable) tbl; } - public void doExecute() throws Exception { - if (isTableLevelTask) { - getTableStats(); - } else { - getTableColumnStats(); - } + private boolean isPartitionColumn() { + return hmsExternalTable.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName())); } // For test protected void setTable(HMSExternalTable table) { - this.table = table; + setTable((ExternalTable) table); + this.hmsExternalTable = table; } - /** - * Get table row count - */ - private void getTableStats() throws Exception { - Map params = buildStatsParams(null); - List columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); - String rowCount = columnResult.get(0).get(0); - Env.getCurrentEnv().getAnalysisManager() - .updateTableStatsStatus( - new TableStatsMeta(Long.parseLong(rowCount), info, tbl)); - job.rowCountDone(this); - } - /** - * Get column statistics and insert the result to __internal_schema.column_statistics - */ - protected void getTableColumnStats() throws Exception { + @Override + protected void getOrdinaryColumnStats() throws Exception { if (!info.usingSqlForPartitionColumn) { try { if (isPartitionColumn()) { @@ -102,77 +72,17 @@ public class HMSAnalysisTask extends BaseAnalysisTask { + "fallback to normal collection", isPartitionColumn() ? "partition " : "", col.getName(), e); /* retry using sql way! */ - getOrdinaryColumnStats(); + super.getOrdinaryColumnStats(); } } else { - getOrdinaryColumnStats(); + super.getOrdinaryColumnStats(); } } - private boolean isPartitionColumn() { - return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName())); - } - - // Get ordinary column stats. Ordinary column means not partition column. - private void getOrdinaryColumnStats() throws Exception { - StringBuilder sb = new StringBuilder(); - Map params = buildStatsParams("NULL"); - params.put("min", getMinFunction()); - params.put("max", getMaxFunction()); - params.put("dataSizeFunction", getDataSizeFunction(col, false)); - Pair sampleInfo = getSampleInfo(); - params.put("scaleFactor", String.valueOf(sampleInfo.first)); - StringSubstitutor stringSubstitutor; - if (tableSample == null) { - // Do full analyze - LOG.debug("Will do full collection for column {}", col.getName()); - sb.append(COLLECT_COL_STATISTICS); - } else { - // Do sample analyze - LOG.debug("Will do sample collection for column {}", col.getName()); - boolean limitFlag = false; - boolean bucketFlag = false; - // If sample size is too large, use limit to control the sample size. - if (needLimit(sampleInfo.second, sampleInfo.first)) { - limitFlag = true; - long columnSize = 0; - for (Column column : table.getFullSchema()) { - columnSize += column.getDataType().getSlotSize(); - } - double targetRows = (double) sampleInfo.second / columnSize; - // Estimate the new scaleFactor based on the schema. - if (targetRows > StatisticsUtil.getHugeTableSampleRows()) { - params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows()); - params.put("scaleFactor", - String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows())); - } - } - // Single distribution column is not fit for DUJ1 estimator, use linear estimator. - Set distributionColumns = tbl.getDistributionColumnNames(); - if (distributionColumns.size() == 1 && distributionColumns.contains(col.getName().toLowerCase())) { - bucketFlag = true; - sb.append(LINEAR_ANALYZE_TEMPLATE); - params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})"); - params.put("rowCount", "ROUND(count(1) * ${scaleFactor})"); - } else { - sb.append(DUJ1_ANALYZE_TEMPLATE); - params.put("dataSizeFunction", getDataSizeFunction(col, true)); - params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})")); - params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})"); - } - LOG.info("Sample for column [{}]. Scale factor [{}], " - + "limited [{}], is distribute column [{}]", - col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag); - } - stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(sb.toString()); - runQuery(sql); - } - // Collect the partition column stats through HMS metadata. // Get all the partition values and calculate the stats based on the values. private void getPartitionColumnStats() throws Exception { - Set partitionNames = table.getPartitionNames(); + Set partitionNames = hmsExternalTable.getPartitionNames(); Set ndvPartValues = Sets.newHashSet(); long numNulls = 0; long dataSize = 0; @@ -198,8 +108,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask { } } // Estimate the row count. This value is inaccurate if the table stats is empty. - TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount; + TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager() + .findTableStatsStatus(hmsExternalTable.getId()); + long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount; dataSize = dataSize * count / partitionNames.size(); numNulls = numNulls * count / partitionNames.size(); int ndv = ndvPartValues.size(); @@ -218,8 +129,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask { // Collect the spark analyzed column stats through HMS metadata. private void getHmsColumnStats() throws Exception { - TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId()); - long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount; + TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager() + .findTableStatsStatus(hmsExternalTable.getId()); + long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount; Map params = buildStatsParams("NULL"); Map statsParams = new HashMap<>(); @@ -229,7 +141,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { statsParams.put(StatsType.MAX_VALUE, "max"); statsParams.put(StatsType.AVG_SIZE, "avg_len"); - if (table.fillColumnStatistics(info.colName, statsParams, params)) { + if (hmsExternalTable.fillColumnStatistics(info.colName, statsParams, params)) { throw new AnalysisException("some column stats not available"); } @@ -283,126 +195,4 @@ public class HMSAnalysisTask extends BaseAnalysisTask { } return value.compareTo(currentMax) > 0 ? value : currentMax; } - - private Map buildStatsParams(String partId) { - Map commonParams = new HashMap<>(); - String id = StatisticsUtil.constructId(tbl.getId(), -1); - if (partId == null) { - commonParams.put("partId", "NULL"); - } else { - id = StatisticsUtil.constructId(id, partId); - commonParams.put("partId", "\'" + partId + "\'"); - } - commonParams.put("internalDB", FeConstants.INTERNAL_DB_NAME); - commonParams.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - commonParams.put("id", id); - commonParams.put("catalogId", String.valueOf(catalog.getId())); - commonParams.put("dbId", String.valueOf(db.getId())); - commonParams.put("tblId", String.valueOf(tbl.getId())); - commonParams.put("indexId", "-1"); - commonParams.put("idxId", "-1"); - commonParams.put("colName", info.colName); - commonParams.put("colId", info.colName); - commonParams.put("catalogName", catalog.getName()); - commonParams.put("dbName", db.getFullName()); - commonParams.put("tblName", tbl.getName()); - commonParams.put("sampleHints", getSampleHint()); - commonParams.put("limit", ""); - commonParams.put("scaleFactor", "1"); - if (col != null) { - commonParams.put("type", col.getType().toString()); - } - commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); - return commonParams; - } - - protected String getSampleHint() { - if (tableSample == null) { - return ""; - } - if (tableSample.isPercent()) { - return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue()); - } else { - return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue()); - } - } - - /** - * Get the pair of sample scale factor and the file size going to sample. - * While analyzing, the result of count, null count and data size need to - * multiply this scale factor to get more accurate result. - * @return Pair of sample scale factor and the file size going to sample. - */ - protected Pair getSampleInfo() { - if (tableSample == null) { - return Pair.of(1.0, 0L); - } - long target; - // Get list of all files' size in this HMS table. - List chunkSizes = table.getChunkSizes(); - Collections.shuffle(chunkSizes, new Random(tableSample.getSeek())); - long total = 0; - // Calculate the total size of this HMS table. - for (long size : chunkSizes) { - total += size; - } - if (total == 0) { - return Pair.of(1.0, 0L); - } - // Calculate the sample target size for percent and rows sample. - if (tableSample.isPercent()) { - target = total * tableSample.getSampleValue() / 100; - } else { - int columnSize = 0; - for (Column column : table.getFullSchema()) { - columnSize += column.getDataType().getSlotSize(); - } - target = columnSize * tableSample.getSampleValue(); - } - // Calculate the actual sample size (cumulate). - long cumulate = 0; - for (long size : chunkSizes) { - cumulate += size; - if (cumulate >= target) { - break; - } - } - return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate); - } - - @Override - protected void afterExecution() { - // Table level task doesn't need to sync any value to sync stats, it stores the value in metadata. - // Partition only task doesn't need to refresh cached. - if (isTableLevelTask || isPartitionOnly) { - return; - } - Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName()); - } - - /** - * If the size to sample is larger than LIMIT_SIZE (1GB) - * and is much larger (1.2*) than the size user want to sample, - * use limit to control the total sample size. - * @param sizeToRead The file size to sample. - * @param factor sizeToRead * factor = Table total size. - * @return True if need to limit. - */ - protected boolean needLimit(long sizeToRead, double factor) { - long total = (long) (sizeToRead * factor); - long target; - if (tableSample.isPercent()) { - target = total * tableSample.getSampleValue() / 100; - } else { - int columnSize = 0; - for (Column column : table.getFullSchema()) { - columnSize += column.getDataType().getSlotSize(); - } - target = columnSize * tableSample.getSampleValue(); - } - if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) { - return true; - } - return false; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index 5c8aec3fbf..6176ec13bd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -84,10 +84,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.types.Types; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; @@ -735,8 +737,12 @@ public class StatisticsUtil { columnStatisticBuilder.setDataSize(0); columnStatisticBuilder.setAvgSizeByte(0); columnStatisticBuilder.setNumNulls(0); - for (FileScanTask task : tableScan.planFiles()) { - processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder); + try (CloseableIterable fileScanTasks = tableScan.planFiles()) { + for (FileScanTask task : fileScanTasks) { + processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder); + } + } catch (IOException e) { + LOG.warn("Error to close FileScanTask.", e); } if (columnStatisticBuilder.getCount() > 0) { columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize() diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java index fb0a3b3c2c..e101686452 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/HMSAnalysisTaskTest.java @@ -252,7 +252,7 @@ public class HMSAnalysisTaskTest { analysisInfoBuilder.setUsingSqlForPartitionColumn(true); task.info = analysisInfoBuilder.build(); - task.getTableColumnStats(); + task.getOrdinaryColumnStats(); } @@ -309,6 +309,6 @@ public class HMSAnalysisTaskTest { analysisInfoBuilder.setUsingSqlForPartitionColumn(false); task.info = analysisInfoBuilder.build(); - task.getTableColumnStats(); + task.getOrdinaryColumnStats(); } } diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 39c41f7c11..6d17bd032f 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -191,6 +191,7 @@ extArrowFlightSqlPassword= "" // iceberg rest catalog config iceberg_rest_uri_port=18181 +iceberg_minio_port=19001 // If the failure suite num exceeds this config // all following suite will be skipped to fast quit the run. diff --git a/regression-test/data/external_table_p0/iceberg/test_iceberg_statistics.out b/regression-test/data/external_table_p0/iceberg/test_iceberg_statistics.out new file mode 100644 index 0000000000..c094d17147 --- /dev/null +++ b/regression-test/data/external_table_p0/iceberg/test_iceberg_statistics.out @@ -0,0 +1,39 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !s1 -- +city 1000 4 0 Beijing Shanghai 6973 +col_binary 1000 867 0 0 1111101100100001001 15356 +col_boolean 1000 2 0 0 1 1000 +col_byte 1000 251 0 -128 127 4000 +col_char 1000 963 0 ! zy@notj#fkedb($ 9348 +col_date 1000 3 0 1969-09-21 2969-02-03 4000 +col_decimal 1000 1006 0 4.028284 9999.512216 8000 +col_double 1000 990 0 0.005217837593576302 9.996285421163707 8000 +col_float 1000 995 0 0.013126845 9.99709 4000 +col_integer 1000 999 0 -21468189 2108484 4000 +col_long 1000 996 0 -92193877774291102 92127291905311066 8000 +col_short 1000 985 0 -32554 32525 4000 +col_string 1000 992 0 0 zx70Jyeb6TfQ1YUaIGC 10714 +col_timestamp 1000 4 0 1970-01-01 08:00:01.000001 1970-01-04 08:00:01.000001 8000 +col_timestamp_ntz 1000 4 0 2017-12-01 10:12:55.038194 2017-12-04 10:12:55.038194 8000 +col_varchar 1000 988 0 0 zvnZ6bBxh 10764 +id 1000 1001 0 -99567408 99854631 8000 + +-- !s2 -- +city 1000 4 0 Beijing Shanghai 6973 +col_binary 1000 867 0 0 1111101100100001001 15356 +col_boolean 1000 2 0 0 1 1000 +col_byte 1000 251 0 -128 127 4000 +col_char 1000 973 0 ! zy@notj#fkedb($ 9324 +col_date 1000 3 0 1969-09-21 2969-02-03 4000 +col_decimal 1000 1006 0 4.028284 9999.512216 8000 +col_double 1000 990 0 0.005217837593576302 9.996285421163707 8000 +col_float 1000 995 0 0.013126845 9.99709 4000 +col_integer 1000 999 0 -21468189 2108484 4000 +col_long 1000 996 0 -92193877774291102 92127291905311066 8000 +col_short 1000 985 0 -32554 32525 4000 +col_string 1000 992 0 0 zx70Jyeb6TfQ1YUaIGC 10714 +col_timestamp 1000 4 0 1970-01-01 08:00:01.000001 1970-01-04 08:00:01.000001 8000 +col_timestamp_ntz 1000 4 0 2017-12-01 10:12:55.038194 2017-12-04 10:12:55.038194 8000 +col_varchar 1000 988 0 0 zvnZ6bBxh 10764 +id 1000 1001 0 -99567408 99854631 8000 + diff --git a/regression-test/data/external_table_p0/paimon/test_paimon_statistics.out b/regression-test/data/external_table_p0/paimon/test_paimon_statistics.out new file mode 100644 index 0000000000..0f9f20d478 --- /dev/null +++ b/regression-test/data/external_table_p0/paimon/test_paimon_statistics.out @@ -0,0 +1,21 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !s1 -- +c1 2 2 0 1 10 2 +c10 2 2 0 10.1 100.1 16 +c11 2 2 0 11.10 110.10 16 +c12 2 2 0 2020-02-02 2020-03-02 8 +c13 2 2 0 130str 13str 11 +c14 2 2 0 140varchar 14varchar 19 +c15 2 2 0 a b 2 +c16 2 2 0 0 1 2 +c17 2 2 0 aaaa bbbb 8 +c18 2 2 0 2023-08-13 09:32:38.530000 2023-08-14 08:32:52.821000 16 +c2 2 2 0 2 20 2 +c3 2 2 0 3 30 4 +c4 2 2 0 4 40 4 +c5 2 2 0 5 50 8 +c6 2 2 0 6 60 8 +c7 2 2 0 7 70 16 +c8 2 2 0 8 80 16 +c9 2 2 0 9.1 90.1 8 + diff --git a/regression-test/data/external_table_p2/hive/test_hive_hudi_statistics.out b/regression-test/data/external_table_p2/hive/test_hive_hudi_statistics.out new file mode 100644 index 0000000000..66a36a81af --- /dev/null +++ b/regression-test/data/external_table_p2/hive/test_hive_hudi_statistics.out @@ -0,0 +1,16 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !s1 -- +_hoodie_commit_seqno 4 4 0 20230605145009209_0_1 20230801201335031_1_1 84 +_hoodie_commit_time 4 3 0 20230605145009209 20230801201335031 68 +_hoodie_file_name 4 4 0 65ffc5d9-397a-456e-a735-30f3ad37466f-0 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet 221 +_hoodie_partition_path 4 3 0 partitionId=2011-11-11/versionId=v_1 partitionId=2021-02-01/versionId=v_4 144 +_hoodie_record_key 4 3 0 rowId:row_1 rowId:row_4 44 +inttolong 4 2 0 0 1 16 +longtoint 4 3 0 1000000 1000004 32 +name 4 3 0 ashin john 15 +partitionid 4 3 0 2011-11-11 2021-02-01 40 +precomb 4 3 0 0 4 32 +rowid 4 3 0 row_1 row_4 20 +tobedeletedstr 4 3 0 toBeDel0 toBeDel4 32 +versionid 4 3 0 v_0 v_4 12 + diff --git a/regression-test/pipeline/p0/conf/regression-conf.groovy b/regression-test/pipeline/p0/conf/regression-conf.groovy index be70ab0f73..b99e21c4e7 100644 --- a/regression-test/pipeline/p0/conf/regression-conf.groovy +++ b/regression-test/pipeline/p0/conf/regression-conf.groovy @@ -96,6 +96,7 @@ kafka_port=19193 // iceberg test config iceberg_rest_uri_port=18181 +iceberg_minio_port=19001 enableEsTest=false es_6_port=19200 diff --git a/regression-test/pipeline/tpch/tpch-sf100/conf/regression-conf.groovy b/regression-test/pipeline/tpch/tpch-sf100/conf/regression-conf.groovy index 364a7103fe..5234ccc424 100644 --- a/regression-test/pipeline/tpch/tpch-sf100/conf/regression-conf.groovy +++ b/regression-test/pipeline/tpch/tpch-sf100/conf/regression-conf.groovy @@ -94,6 +94,7 @@ kafka_port=19193 // iceberg test config iceberg_rest_uri_port=18181 +iceberg_minio_port=19001 enableEsTest=false es_6_port=19200 diff --git a/regression-test/plugins/plugins_get_ids_from_proc.groovy b/regression-test/plugins/plugins_get_ids_from_proc.groovy new file mode 100644 index 0000000000..74a4d4d201 --- /dev/null +++ b/regression-test/plugins/plugins_get_ids_from_proc.groovy @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.Suite + +Suite.metaClass.get_catalog_id = {String catalog_name /* param */ -> + String catalog_id; + def catalogs = sql """show proc '/catalogs'""" + for (catalog in catalogs) { + if (catalog[1].equals(catalog_name)) { + catalog_id = catalog[0] + break + } + } + log.info("get catalogid: " + catalog_id) + return catalog_id +} + + +Suite.metaClass.get_database_id = {String catalog_name, String db_name /* param */ -> + String database_id; + def catalog_id = get_catalog_id(catalog_name) + def dbs = sql """show proc '/catalogs/${catalog_id}'""" + for (db in dbs) { + if (db[1].equals(db_name)) { + database_id = db[0] + break + } + } + log.info("get database_id: " + database_id) + return database_id +} + + +Suite.metaClass.get_table_id = {String catalog_name, String db_name, String tb_name /* param */ -> + String table_id; + def catalog_id = get_catalog_id(catalog_name) + def database_id = get_database_id(catalog_name, db_name) + def tbs = sql """show proc '/catalogs/${catalog_id}/${database_id}'""" + for (tb in tbs) { + if (tb[1].equals(tb_name)) { + table_id = tb[0] + break + } + } + log.info("get table_id: " + table_id) + return table_id +} diff --git a/regression-test/suites/external_table_p0/iceberg/test_iceberg_statistics.groovy b/regression-test/suites/external_table_p0/iceberg/test_iceberg_statistics.groovy new file mode 100644 index 0000000000..24b27eb70b --- /dev/null +++ b/regression-test/suites/external_table_p0/iceberg/test_iceberg_statistics.groovy @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iceberg_statistics", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enableIcebergTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port") + String minio_port = context.config.otherConfigs.get("iceberg_minio_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String catalog_name = "test_iceberg_rest_catalog" + String db_name = "format_v2" + + sql """drop catalog if exists ${catalog_name}""" + sql """CREATE CATALOG ${catalog_name} PROPERTIES ( + 'type'='iceberg', + 'iceberg.catalog.type'='rest', + 'uri' = 'http://${externalEnvIp}:${rest_port}', + "s3.access_key" = "admin", + "s3.secret_key" = "password", + "s3.endpoint" = "http://${externalEnvIp}:${minio_port}", + "s3.region" = "us-east-1" + );""" + + def table_id_mor = get_table_id(catalog_name, db_name, "sample_mor_parquet") + def table_id_cow = get_table_id(catalog_name, db_name, "sample_cow_parquet") + + // analyze + sql """use `${catalog_name}`.`${db_name}`""" + sql """analyze table sample_mor_parquet with sync""" + sql """analyze table sample_cow_parquet with sync""" + + // select + def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id_mor} order by id;""" + def s2 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id_cow} order by id;""" + + qt_s1 s1 + qt_s2 s2 + } finally { + } + } +} + diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy index 87ea14ad2f..ce8c9b5e84 100644 --- a/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_catalog.groovy @@ -52,6 +52,9 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_ ); """ + sql """drop catalog ${file_ctl_name}"""; + sql """drop catalog ${hms_ctl_name}"""; + String enabled = context.config.otherConfigs.get("enablePaimonTest") if (enabled != null && enabled.equalsIgnoreCase("true")) { def all = """select * from all_table order by c1;""" diff --git a/regression-test/suites/external_table_p0/paimon/test_paimon_statistics.groovy b/regression-test/suites/external_table_p0/paimon/test_paimon_statistics.groovy new file mode 100644 index 0000000000..c75e7b797d --- /dev/null +++ b/regression-test/suites/external_table_p0/paimon/test_paimon_statistics.groovy @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_paimon_statistics", "p0,external,doris,external_docker,external_docker_doris") { + String enabled = context.config.otherConfigs.get("enablePaimonTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + try { + String hdfs_port = context.config.otherConfigs.get("hdfs_port") + String catalog_name = "paimon1" + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + + sql """drop catalog if exists ${catalog_name}""" + sql """create catalog if not exists ${catalog_name} properties ( + "type" = "paimon", + "paimon.catalog.type"="filesystem", + "warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1" + );""" + + def table_id = get_table_id(catalog_name, "db1", "all_table") + + // analyze + sql """use `${catalog_name}`.`db1`""" + sql """analyze table all_table with sync""" + + // select + def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id} order by id;""" + + qt_s1 s1 + } finally { + } + } +} + diff --git a/regression-test/suites/external_table_p0/test_catalog_ddl.groovy b/regression-test/suites/external_table_p0/test_catalog_ddl.groovy index b236567e8b..a9a67f5185 100644 --- a/regression-test/suites/external_table_p0/test_catalog_ddl.groovy +++ b/regression-test/suites/external_table_p0/test_catalog_ddl.groovy @@ -44,4 +44,6 @@ suite("test_catalog_ddl", "p0,external,external_docker") { result = sql """show create catalog ${catalog1};""" assertEquals(result.size(), 1) assertTrue(result[0][1].contains("COMMENT \"alter_comment\"")) + + sql """drop catalog ${catalog1}""" } diff --git a/regression-test/suites/external_table_p2/hive/test_hive_hudi_statistics.groovy b/regression-test/suites/external_table_p2/hive/test_hive_hudi_statistics.groovy new file mode 100644 index 0000000000..55e5037de4 --- /dev/null +++ b/regression-test/suites/external_table_p2/hive/test_hive_hudi_statistics.groovy @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_hive_hudi_statistics", "p2,external,hive,hudi") { + String enabled = context.config.otherConfigs.get("enableExternalHiveTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost") + String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort") + String catalog_name = "test_hive_hudi_statistics" + String db_name = "hudi_catalog" + + sql """drop catalog if exists ${catalog_name};""" + sql """ + create catalog if not exists ${catalog_name} properties ( + 'hadoop.username'='hadoop', + 'type'='hms', + 'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}' + ); + """ + + def table_id = get_table_id(catalog_name, db_name, "partitioned_mor_rt") + + // analyze + sql """use `${catalog_name}`.`${db_name}`""" + sql """analyze table partitioned_mor_rt with sync""" + + // select + def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id} order by id;""" + + qt_s1 s1 + + } +}