[feature](statistics)support statistics for iceberg/paimon/hudi table (#29868)

This commit is contained in:
wuwenchi
2024-01-17 17:43:05 +08:00
committed by yiguolei
parent ade720470d
commit 44ba9e102c
21 changed files with 621 additions and 237 deletions

View File

@ -21,4 +21,4 @@ SPARK_DRIVER_UI_PORT=8080
SPARK_HISTORY_UI_PORT=10000
REST_CATALOG_PORT=18181
MINIO_UI_PORT=9000
MINIO_API_PORT=9001
MINIO_API_PORT=19001

View File

@ -58,6 +58,8 @@ services:
minio:
image: minio/minio
container_name: doris--minio
ports:
- ${MINIO_API_PORT}:9000
environment:
- MINIO_ROOT_USER=admin
- MINIO_ROOT_PASSWORD=password

View File

@ -23,7 +23,10 @@ import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TIcebergTable;
@ -149,4 +152,10 @@ public class IcebergExternalTable extends ExternalTable {
() -> StatisticsUtil.getIcebergColumnStats(colName,
((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name)));
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
}

View File

@ -21,6 +21,9 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ExternalAnalysisTask;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -154,4 +157,10 @@ public class PaimonExternalTable extends ExternalTable {
+ getPaimonCatalogType());
}
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
return new ExternalAnalysisTask(info);
}
}

View File

@ -27,6 +27,7 @@ import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
@ -37,6 +38,7 @@ import java.util.List;
* show all catalogs' info
*/
public class CatalogsProcDir implements ProcDirInterface {
private static final Logger LOG = Logger.getLogger(CatalogsProcDir.class);
public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
.add("CatalogIds").add("CatalogName").add("DatabaseNum").add("LastUpdateTime")
.build();
@ -90,7 +92,13 @@ public class CatalogsProcDir implements ProcDirInterface {
List<Comparable> catalogInfo = Lists.newArrayList();
catalogInfo.add(catalog.getId());
catalogInfo.add(catalog.getName());
catalogInfo.add(catalog.getDbNames().size());
int size = -1;
try {
size = catalog.getDbNames().size();
} catch (Exception e) {
LOG.warn("failed to get database: ", e);
}
catalogInfo.add(size);
catalogInfo.add(TimeUtils.longToTimeString(catalog.getLastUpdateTime()));
catalogInfos.add(catalogInfo);
}

View File

@ -0,0 +1,263 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public class ExternalAnalysisTask extends BaseAnalysisTask {
private static final Logger LOG = LogManager.getLogger(ExternalAnalysisTask.class);
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}";
private boolean isTableLevelTask;
private boolean isPartitionOnly;
private ExternalTable table;
// For test
public ExternalAnalysisTask() {
}
public ExternalAnalysisTask(AnalysisInfo info) {
super(info);
isTableLevelTask = info.externalTableLevelTask;
isPartitionOnly = info.partitionOnly;
table = (ExternalTable) tbl;
}
public void doExecute() throws Exception {
if (isTableLevelTask) {
getTableStats();
} else {
getOrdinaryColumnStats();
}
}
// For test
protected void setTable(ExternalTable table) {
this.table = table;
}
/**
* Get table row count
*/
private void getTableStats() {
Map<String, String> params = buildStatsParams(null);
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStatsMeta(Long.parseLong(rowCount), info, tbl));
job.rowCountDone(this);
}
// Get ordinary column stats
protected void getOrdinaryColumnStats() throws Exception {
StringBuilder sb = new StringBuilder();
Map<String, String> params = buildStatsParams("NULL");
params.put("min", getMinFunction());
params.put("max", getMaxFunction());
params.put("dataSizeFunction", getDataSizeFunction(col, false));
Pair<Double, Long> sampleInfo = getSampleInfo();
params.put("scaleFactor", String.valueOf(sampleInfo.first));
StringSubstitutor stringSubstitutor;
if (tableSample == null) {
// Do full analyze
LOG.debug("Will do full collection for column {}", col.getName());
sb.append(COLLECT_COL_STATISTICS);
} else {
// Do sample analyze
LOG.debug("Will do sample collection for column {}", col.getName());
boolean limitFlag = false;
boolean bucketFlag = false;
// If sample size is too large, use limit to control the sample size.
if (needLimit(sampleInfo.second, sampleInfo.first)) {
limitFlag = true;
long columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
double targetRows = (double) sampleInfo.second / columnSize;
// Estimate the new scaleFactor based on the schema.
if (targetRows > StatisticsUtil.getHugeTableSampleRows()) {
params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows());
params.put("scaleFactor",
String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows()));
}
}
// Single distribution column is not fit for DUJ1 estimator, use linear estimator.
Set<String> distributionColumns = tbl.getDistributionColumnNames();
if (distributionColumns.size() == 1 && distributionColumns.contains(col.getName().toLowerCase())) {
bucketFlag = true;
sb.append(LINEAR_ANALYZE_TEMPLATE);
params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
params.put("rowCount", "ROUND(count(1) * ${scaleFactor})");
} else {
sb.append(DUJ1_ANALYZE_TEMPLATE);
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})"));
params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})");
}
LOG.info("Sample for column [{}]. Scale factor [{}], "
+ "limited [{}], is distribute column [{}]",
col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag);
}
stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
runQuery(sql);
}
protected Map<String, String> buildStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
if (partId == null) {
commonParams.put("partId", "NULL");
} else {
id = StatisticsUtil.constructId(id, partId);
commonParams.put("partId", "\'" + partId + "\'");
}
commonParams.put("internalDB", FeConstants.INTERNAL_DB_NAME);
commonParams.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
commonParams.put("id", id);
commonParams.put("catalogId", String.valueOf(catalog.getId()));
commonParams.put("dbId", String.valueOf(db.getId()));
commonParams.put("tblId", String.valueOf(tbl.getId()));
commonParams.put("indexId", "-1");
commonParams.put("idxId", "-1");
commonParams.put("colName", info.colName);
commonParams.put("colId", info.colName);
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleHints", getSampleHint());
commonParams.put("limit", "");
commonParams.put("scaleFactor", "1");
if (col != null) {
commonParams.put("type", col.getType().toString());
}
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}
protected String getSampleHint() {
if (tableSample == null) {
return "";
}
if (tableSample.isPercent()) {
return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue());
} else {
return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue());
}
}
/**
* Get the pair of sample scale factor and the file size going to sample.
* While analyzing, the result of count, null count and data size need to
* multiply this scale factor to get more accurate result.
* @return Pair of sample scale factor and the file size going to sample.
*/
protected Pair<Double, Long> getSampleInfo() {
if (tableSample == null) {
return Pair.of(1.0, 0L);
}
long target;
// Get list of all files' size in this HMS table.
List<Long> chunkSizes = table.getChunkSizes();
Collections.shuffle(chunkSizes, new Random(tableSample.getSeek()));
long total = 0;
// Calculate the total size of this HMS table.
for (long size : chunkSizes) {
total += size;
}
if (total == 0) {
return Pair.of(1.0, 0L);
}
// Calculate the sample target size for percent and rows sample.
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
// Calculate the actual sample size (cumulate).
long cumulate = 0;
for (long size : chunkSizes) {
cumulate += size;
if (cumulate >= target) {
break;
}
}
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
}
@Override
protected void afterExecution() {
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
// Partition only task doesn't need to refresh cached.
if (isTableLevelTask || isPartitionOnly) {
return;
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
/**
* If the size to sample is larger than LIMIT_SIZE (1GB)
* and is much larger (1.2*) than the size user want to sample,
* use limit to control the total sample size.
* @param sizeToRead The file size to sample.
* @param factor sizeToRead * factor = Table total size.
* @return True if need to limit.
*/
protected boolean needLimit(long sizeToRead, double factor) {
long total = (long) (sizeToRead * factor);
long target;
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) {
return true;
}
return false;
}
}

View File

@ -17,12 +17,10 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.statistics.util.StatisticsUtil;
@ -32,64 +30,36 @@ import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
public class HMSAnalysisTask extends BaseAnalysisTask {
public class HMSAnalysisTask extends ExternalAnalysisTask {
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
private HMSExternalTable hmsExternalTable;
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleHints}";
private boolean isTableLevelTask;
private boolean isPartitionOnly;
private HMSExternalTable table;
// for test
public HMSAnalysisTask() {
}
public HMSAnalysisTask(AnalysisInfo info) {
super(info);
isTableLevelTask = info.externalTableLevelTask;
isPartitionOnly = info.partitionOnly;
table = (HMSExternalTable) tbl;
hmsExternalTable = (HMSExternalTable) tbl;
}
public void doExecute() throws Exception {
if (isTableLevelTask) {
getTableStats();
} else {
getTableColumnStats();
}
private boolean isPartitionColumn() {
return hmsExternalTable.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName()));
}
// For test
protected void setTable(HMSExternalTable table) {
this.table = table;
setTable((ExternalTable) table);
this.hmsExternalTable = table;
}
/**
* Get table row count
*/
private void getTableStats() throws Exception {
Map<String, String> params = buildStatsParams(null);
List<ResultRow> columnResult =
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStatsMeta(Long.parseLong(rowCount), info, tbl));
job.rowCountDone(this);
}
/**
* Get column statistics and insert the result to __internal_schema.column_statistics
*/
protected void getTableColumnStats() throws Exception {
@Override
protected void getOrdinaryColumnStats() throws Exception {
if (!info.usingSqlForPartitionColumn) {
try {
if (isPartitionColumn()) {
@ -102,77 +72,17 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
+ "fallback to normal collection",
isPartitionColumn() ? "partition " : "", col.getName(), e);
/* retry using sql way! */
getOrdinaryColumnStats();
super.getOrdinaryColumnStats();
}
} else {
getOrdinaryColumnStats();
super.getOrdinaryColumnStats();
}
}
private boolean isPartitionColumn() {
return table.getPartitionColumns().stream().anyMatch(c -> c.getName().equals(col.getName()));
}
// Get ordinary column stats. Ordinary column means not partition column.
private void getOrdinaryColumnStats() throws Exception {
StringBuilder sb = new StringBuilder();
Map<String, String> params = buildStatsParams("NULL");
params.put("min", getMinFunction());
params.put("max", getMaxFunction());
params.put("dataSizeFunction", getDataSizeFunction(col, false));
Pair<Double, Long> sampleInfo = getSampleInfo();
params.put("scaleFactor", String.valueOf(sampleInfo.first));
StringSubstitutor stringSubstitutor;
if (tableSample == null) {
// Do full analyze
LOG.debug("Will do full collection for column {}", col.getName());
sb.append(COLLECT_COL_STATISTICS);
} else {
// Do sample analyze
LOG.debug("Will do sample collection for column {}", col.getName());
boolean limitFlag = false;
boolean bucketFlag = false;
// If sample size is too large, use limit to control the sample size.
if (needLimit(sampleInfo.second, sampleInfo.first)) {
limitFlag = true;
long columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
double targetRows = (double) sampleInfo.second / columnSize;
// Estimate the new scaleFactor based on the schema.
if (targetRows > StatisticsUtil.getHugeTableSampleRows()) {
params.put("limit", "limit " + StatisticsUtil.getHugeTableSampleRows());
params.put("scaleFactor",
String.valueOf(sampleInfo.first * targetRows / StatisticsUtil.getHugeTableSampleRows()));
}
}
// Single distribution column is not fit for DUJ1 estimator, use linear estimator.
Set<String> distributionColumns = tbl.getDistributionColumnNames();
if (distributionColumns.size() == 1 && distributionColumns.contains(col.getName().toLowerCase())) {
bucketFlag = true;
sb.append(LINEAR_ANALYZE_TEMPLATE);
params.put("ndvFunction", "ROUND(NDV(`${colName}`) * ${scaleFactor})");
params.put("rowCount", "ROUND(count(1) * ${scaleFactor})");
} else {
sb.append(DUJ1_ANALYZE_TEMPLATE);
params.put("dataSizeFunction", getDataSizeFunction(col, true));
params.put("ndvFunction", getNdvFunction("ROUND(SUM(t1.count) * ${scaleFactor})"));
params.put("rowCount", "ROUND(SUM(t1.count) * ${scaleFactor})");
}
LOG.info("Sample for column [{}]. Scale factor [{}], "
+ "limited [{}], is distribute column [{}]",
col.getName(), params.get("scaleFactor"), limitFlag, bucketFlag);
}
stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
runQuery(sql);
}
// Collect the partition column stats through HMS metadata.
// Get all the partition values and calculate the stats based on the values.
private void getPartitionColumnStats() throws Exception {
Set<String> partitionNames = table.getPartitionNames();
Set<String> partitionNames = hmsExternalTable.getPartitionNames();
Set<String> ndvPartValues = Sets.newHashSet();
long numNulls = 0;
long dataSize = 0;
@ -198,8 +108,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
}
// Estimate the row count. This value is inaccurate if the table stats is empty.
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount;
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager()
.findTableStatsStatus(hmsExternalTable.getId());
long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount;
dataSize = dataSize * count / partitionNames.size();
numNulls = numNulls * count / partitionNames.size();
int ndv = ndvPartValues.size();
@ -218,8 +129,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
// Collect the spark analyzed column stats through HMS metadata.
private void getHmsColumnStats() throws Exception {
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
long count = tableStatsStatus == null ? table.estimatedRowCount() : tableStatsStatus.rowCount;
TableStatsMeta tableStatsStatus = Env.getCurrentEnv().getAnalysisManager()
.findTableStatsStatus(hmsExternalTable.getId());
long count = tableStatsStatus == null ? hmsExternalTable.estimatedRowCount() : tableStatsStatus.rowCount;
Map<String, String> params = buildStatsParams("NULL");
Map<StatsType, String> statsParams = new HashMap<>();
@ -229,7 +141,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
statsParams.put(StatsType.MAX_VALUE, "max");
statsParams.put(StatsType.AVG_SIZE, "avg_len");
if (table.fillColumnStatistics(info.colName, statsParams, params)) {
if (hmsExternalTable.fillColumnStatistics(info.colName, statsParams, params)) {
throw new AnalysisException("some column stats not available");
}
@ -283,126 +195,4 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
}
return value.compareTo(currentMax) > 0 ? value : currentMax;
}
private Map<String, String> buildStatsParams(String partId) {
Map<String, String> commonParams = new HashMap<>();
String id = StatisticsUtil.constructId(tbl.getId(), -1);
if (partId == null) {
commonParams.put("partId", "NULL");
} else {
id = StatisticsUtil.constructId(id, partId);
commonParams.put("partId", "\'" + partId + "\'");
}
commonParams.put("internalDB", FeConstants.INTERNAL_DB_NAME);
commonParams.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
commonParams.put("id", id);
commonParams.put("catalogId", String.valueOf(catalog.getId()));
commonParams.put("dbId", String.valueOf(db.getId()));
commonParams.put("tblId", String.valueOf(tbl.getId()));
commonParams.put("indexId", "-1");
commonParams.put("idxId", "-1");
commonParams.put("colName", info.colName);
commonParams.put("colId", info.colName);
commonParams.put("catalogName", catalog.getName());
commonParams.put("dbName", db.getFullName());
commonParams.put("tblName", tbl.getName());
commonParams.put("sampleHints", getSampleHint());
commonParams.put("limit", "");
commonParams.put("scaleFactor", "1");
if (col != null) {
commonParams.put("type", col.getType().toString());
}
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
return commonParams;
}
protected String getSampleHint() {
if (tableSample == null) {
return "";
}
if (tableSample.isPercent()) {
return String.format("TABLESAMPLE(%d PERCENT)", tableSample.getSampleValue());
} else {
return String.format("TABLESAMPLE(%d ROWS)", tableSample.getSampleValue());
}
}
/**
* Get the pair of sample scale factor and the file size going to sample.
* While analyzing, the result of count, null count and data size need to
* multiply this scale factor to get more accurate result.
* @return Pair of sample scale factor and the file size going to sample.
*/
protected Pair<Double, Long> getSampleInfo() {
if (tableSample == null) {
return Pair.of(1.0, 0L);
}
long target;
// Get list of all files' size in this HMS table.
List<Long> chunkSizes = table.getChunkSizes();
Collections.shuffle(chunkSizes, new Random(tableSample.getSeek()));
long total = 0;
// Calculate the total size of this HMS table.
for (long size : chunkSizes) {
total += size;
}
if (total == 0) {
return Pair.of(1.0, 0L);
}
// Calculate the sample target size for percent and rows sample.
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
// Calculate the actual sample size (cumulate).
long cumulate = 0;
for (long size : chunkSizes) {
cumulate += size;
if (cumulate >= target) {
break;
}
}
return Pair.of(Math.max(((double) total) / cumulate, 1), cumulate);
}
@Override
protected void afterExecution() {
// Table level task doesn't need to sync any value to sync stats, it stores the value in metadata.
// Partition only task doesn't need to refresh cached.
if (isTableLevelTask || isPartitionOnly) {
return;
}
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}
/**
* If the size to sample is larger than LIMIT_SIZE (1GB)
* and is much larger (1.2*) than the size user want to sample,
* use limit to control the total sample size.
* @param sizeToRead The file size to sample.
* @param factor sizeToRead * factor = Table total size.
* @return True if need to limit.
*/
protected boolean needLimit(long sizeToRead, double factor) {
long total = (long) (sizeToRead * factor);
long target;
if (tableSample.isPercent()) {
target = total * tableSample.getSampleValue() / 100;
} else {
int columnSize = 0;
for (Column column : table.getFullSchema()) {
columnSize += column.getDataType().getSlotSize();
}
target = columnSize * tableSample.getSampleValue();
}
if (sizeToRead > LIMIT_SIZE && sizeToRead > target * LIMIT_FACTOR) {
return true;
}
return false;
}
}

View File

@ -84,10 +84,12 @@ import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
@ -735,8 +737,12 @@ public class StatisticsUtil {
columnStatisticBuilder.setDataSize(0);
columnStatisticBuilder.setAvgSizeByte(0);
columnStatisticBuilder.setNumNulls(0);
for (FileScanTask task : tableScan.planFiles()) {
processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder);
try (CloseableIterable<FileScanTask> fileScanTasks = tableScan.planFiles()) {
for (FileScanTask task : fileScanTasks) {
processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder);
}
} catch (IOException e) {
LOG.warn("Error to close FileScanTask.", e);
}
if (columnStatisticBuilder.getCount() > 0) {
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()

View File

@ -252,7 +252,7 @@ public class HMSAnalysisTaskTest {
analysisInfoBuilder.setUsingSqlForPartitionColumn(true);
task.info = analysisInfoBuilder.build();
task.getTableColumnStats();
task.getOrdinaryColumnStats();
}
@ -309,6 +309,6 @@ public class HMSAnalysisTaskTest {
analysisInfoBuilder.setUsingSqlForPartitionColumn(false);
task.info = analysisInfoBuilder.build();
task.getTableColumnStats();
task.getOrdinaryColumnStats();
}
}

View File

@ -191,6 +191,7 @@ extArrowFlightSqlPassword= ""
// iceberg rest catalog config
iceberg_rest_uri_port=18181
iceberg_minio_port=19001
// If the failure suite num exceeds this config
// all following suite will be skipped to fast quit the run.

View File

@ -0,0 +1,39 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !s1 --
city 1000 4 0 Beijing Shanghai 6973
col_binary 1000 867 0 0 1111101100100001001 15356
col_boolean 1000 2 0 0 1 1000
col_byte 1000 251 0 -128 127 4000
col_char 1000 963 0 ! zy@notj#fkedb($ 9348
col_date 1000 3 0 1969-09-21 2969-02-03 4000
col_decimal 1000 1006 0 4.028284 9999.512216 8000
col_double 1000 990 0 0.005217837593576302 9.996285421163707 8000
col_float 1000 995 0 0.013126845 9.99709 4000
col_integer 1000 999 0 -21468189 2108484 4000
col_long 1000 996 0 -92193877774291102 92127291905311066 8000
col_short 1000 985 0 -32554 32525 4000
col_string 1000 992 0 0 zx70Jyeb6TfQ1YUaIGC 10714
col_timestamp 1000 4 0 1970-01-01 08:00:01.000001 1970-01-04 08:00:01.000001 8000
col_timestamp_ntz 1000 4 0 2017-12-01 10:12:55.038194 2017-12-04 10:12:55.038194 8000
col_varchar 1000 988 0 0 zvnZ6bBxh 10764
id 1000 1001 0 -99567408 99854631 8000
-- !s2 --
city 1000 4 0 Beijing Shanghai 6973
col_binary 1000 867 0 0 1111101100100001001 15356
col_boolean 1000 2 0 0 1 1000
col_byte 1000 251 0 -128 127 4000
col_char 1000 973 0 ! zy@notj#fkedb($ 9324
col_date 1000 3 0 1969-09-21 2969-02-03 4000
col_decimal 1000 1006 0 4.028284 9999.512216 8000
col_double 1000 990 0 0.005217837593576302 9.996285421163707 8000
col_float 1000 995 0 0.013126845 9.99709 4000
col_integer 1000 999 0 -21468189 2108484 4000
col_long 1000 996 0 -92193877774291102 92127291905311066 8000
col_short 1000 985 0 -32554 32525 4000
col_string 1000 992 0 0 zx70Jyeb6TfQ1YUaIGC 10714
col_timestamp 1000 4 0 1970-01-01 08:00:01.000001 1970-01-04 08:00:01.000001 8000
col_timestamp_ntz 1000 4 0 2017-12-01 10:12:55.038194 2017-12-04 10:12:55.038194 8000
col_varchar 1000 988 0 0 zvnZ6bBxh 10764
id 1000 1001 0 -99567408 99854631 8000

View File

@ -0,0 +1,21 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !s1 --
c1 2 2 0 1 10 2
c10 2 2 0 10.1 100.1 16
c11 2 2 0 11.10 110.10 16
c12 2 2 0 2020-02-02 2020-03-02 8
c13 2 2 0 130str 13str 11
c14 2 2 0 140varchar 14varchar 19
c15 2 2 0 a b 2
c16 2 2 0 0 1 2
c17 2 2 0 aaaa bbbb 8
c18 2 2 0 2023-08-13 09:32:38.530000 2023-08-14 08:32:52.821000 16
c2 2 2 0 2 20 2
c3 2 2 0 3 30 4
c4 2 2 0 4 40 4
c5 2 2 0 5 50 8
c6 2 2 0 6 60 8
c7 2 2 0 7 70 16
c8 2 2 0 8 80 16
c9 2 2 0 9.1 90.1 8

View File

@ -0,0 +1,16 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !s1 --
_hoodie_commit_seqno 4 4 0 20230605145009209_0_1 20230801201335031_1_1 84
_hoodie_commit_time 4 3 0 20230605145009209 20230801201335031 68
_hoodie_file_name 4 4 0 65ffc5d9-397a-456e-a735-30f3ad37466f-0 e33d645c-6e2f-41f3-b8d6-f658771bd460-0_1-83-220_20230605145403388.parquet 221
_hoodie_partition_path 4 3 0 partitionId=2011-11-11/versionId=v_1 partitionId=2021-02-01/versionId=v_4 144
_hoodie_record_key 4 3 0 rowId:row_1 rowId:row_4 44
inttolong 4 2 0 0 1 16
longtoint 4 3 0 1000000 1000004 32
name 4 3 0 ashin john 15
partitionid 4 3 0 2011-11-11 2021-02-01 40
precomb 4 3 0 0 4 32
rowid 4 3 0 row_1 row_4 20
tobedeletedstr 4 3 0 toBeDel0 toBeDel4 32
versionid 4 3 0 v_0 v_4 12

View File

@ -96,6 +96,7 @@ kafka_port=19193
// iceberg test config
iceberg_rest_uri_port=18181
iceberg_minio_port=19001
enableEsTest=false
es_6_port=19200

View File

@ -94,6 +94,7 @@ kafka_port=19193
// iceberg test config
iceberg_rest_uri_port=18181
iceberg_minio_port=19001
enableEsTest=false
es_6_port=19200

View File

@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.Suite
Suite.metaClass.get_catalog_id = {String catalog_name /* param */ ->
String catalog_id;
def catalogs = sql """show proc '/catalogs'"""
for (catalog in catalogs) {
if (catalog[1].equals(catalog_name)) {
catalog_id = catalog[0]
break
}
}
log.info("get catalogid: " + catalog_id)
return catalog_id
}
Suite.metaClass.get_database_id = {String catalog_name, String db_name /* param */ ->
String database_id;
def catalog_id = get_catalog_id(catalog_name)
def dbs = sql """show proc '/catalogs/${catalog_id}'"""
for (db in dbs) {
if (db[1].equals(db_name)) {
database_id = db[0]
break
}
}
log.info("get database_id: " + database_id)
return database_id
}
Suite.metaClass.get_table_id = {String catalog_name, String db_name, String tb_name /* param */ ->
String table_id;
def catalog_id = get_catalog_id(catalog_name)
def database_id = get_database_id(catalog_name, db_name)
def tbs = sql """show proc '/catalogs/${catalog_id}/${database_id}'"""
for (tb in tbs) {
if (tb[1].equals(tb_name)) {
table_id = tb[0]
break
}
}
log.info("get table_id: " + table_id)
return table_id
}

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_iceberg_statistics", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enableIcebergTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String catalog_name = "test_iceberg_rest_catalog"
String db_name = "format_v2"
sql """drop catalog if exists ${catalog_name}"""
sql """CREATE CATALOG ${catalog_name} PROPERTIES (
'type'='iceberg',
'iceberg.catalog.type'='rest',
'uri' = 'http://${externalEnvIp}:${rest_port}',
"s3.access_key" = "admin",
"s3.secret_key" = "password",
"s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
"s3.region" = "us-east-1"
);"""
def table_id_mor = get_table_id(catalog_name, db_name, "sample_mor_parquet")
def table_id_cow = get_table_id(catalog_name, db_name, "sample_cow_parquet")
// analyze
sql """use `${catalog_name}`.`${db_name}`"""
sql """analyze table sample_mor_parquet with sync"""
sql """analyze table sample_cow_parquet with sync"""
// select
def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id_mor} order by id;"""
def s2 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id_cow} order by id;"""
qt_s1 s1
qt_s2 s2
} finally {
}
}
}

View File

@ -52,6 +52,9 @@ suite("test_paimon_catalog", "p0,external,doris,external_docker,external_docker_
);
"""
sql """drop catalog ${file_ctl_name}""";
sql """drop catalog ${hms_ctl_name}""";
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
def all = """select * from all_table order by c1;"""

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_paimon_statistics", "p0,external,doris,external_docker,external_docker_doris") {
String enabled = context.config.otherConfigs.get("enablePaimonTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
try {
String hdfs_port = context.config.otherConfigs.get("hdfs_port")
String catalog_name = "paimon1"
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
sql """drop catalog if exists ${catalog_name}"""
sql """create catalog if not exists ${catalog_name} properties (
"type" = "paimon",
"paimon.catalog.type"="filesystem",
"warehouse" = "hdfs://${externalEnvIp}:${hdfs_port}/user/doris/paimon1"
);"""
def table_id = get_table_id(catalog_name, "db1", "all_table")
// analyze
sql """use `${catalog_name}`.`db1`"""
sql """analyze table all_table with sync"""
// select
def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id} order by id;"""
qt_s1 s1
} finally {
}
}
}

View File

@ -44,4 +44,6 @@ suite("test_catalog_ddl", "p0,external,external_docker") {
result = sql """show create catalog ${catalog1};"""
assertEquals(result.size(), 1)
assertTrue(result[0][1].contains("COMMENT \"alter_comment\""))
sql """drop catalog ${catalog1}"""
}

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_hive_hudi_statistics", "p2,external,hive,hudi") {
String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
String catalog_name = "test_hive_hudi_statistics"
String db_name = "hudi_catalog"
sql """drop catalog if exists ${catalog_name};"""
sql """
create catalog if not exists ${catalog_name} properties (
'hadoop.username'='hadoop',
'type'='hms',
'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
);
"""
def table_id = get_table_id(catalog_name, db_name, "partitioned_mor_rt")
// analyze
sql """use `${catalog_name}`.`${db_name}`"""
sql """analyze table partitioned_mor_rt with sync"""
// select
def s1 = """select col_id,count,ndv,null_count,min,max,data_size_in_bytes from internal.__internal_schema.column_statistics where tbl_id = ${table_id} order by id;"""
qt_s1 s1
}
}