[Improvement](statistics, multi catalog)Support iceberg table stats collection (#21481)
Fetch iceberg table stats automatically while querying a table. Collect accurate statistics for Iceberg table by running analyze sql in Doris (remove collect by meta option).
This commit is contained in:
@ -29,8 +29,7 @@ import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.ColumnStatisticBuilder;
|
||||
import org.apache.doris.statistics.HiveAnalysisTask;
|
||||
import org.apache.doris.statistics.IcebergAnalysisTask;
|
||||
import org.apache.doris.statistics.HMSAnalysisTask;
|
||||
import org.apache.doris.statistics.TableStatistic;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.thrift.THiveTable;
|
||||
@ -322,14 +321,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
@Override
|
||||
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
|
||||
makeSureInitialized();
|
||||
switch (dlaType) {
|
||||
case HIVE:
|
||||
return new HiveAnalysisTask(info);
|
||||
case ICEBERG:
|
||||
return new IcebergAnalysisTask(info);
|
||||
default:
|
||||
throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported.");
|
||||
}
|
||||
return new HMSAnalysisTask(info);
|
||||
}
|
||||
|
||||
public String getViewText() {
|
||||
@ -473,6 +465,19 @@ public class HMSExternalTable extends ExternalTable {
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
|
||||
makeSureInitialized();
|
||||
switch (dlaType) {
|
||||
case HIVE:
|
||||
return getHiveColumnStats(colName);
|
||||
case ICEBERG:
|
||||
return StatisticsUtil.getIcebergColumnStats(colName, HiveMetaStoreClientHelper.getIcebergTable(this));
|
||||
default:
|
||||
LOG.warn("get column stats for dlaType {} is not supported.", dlaType);
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private Optional<ColumnStatistic> getHiveColumnStats(String colName) {
|
||||
List<ColumnStatisticsObj> tableStats = getHiveTableColumnStats(Lists.newArrayList(colName));
|
||||
if (tableStats == null || tableStats.isEmpty()) {
|
||||
LOG.debug(String.format("No table stats found in Hive metastore for column %s in table %s.",
|
||||
|
||||
@ -22,6 +22,8 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.thrift.THiveTable;
|
||||
import org.apache.doris.thrift.TIcebergTable;
|
||||
import org.apache.doris.thrift.TTableDescriptor;
|
||||
@ -33,6 +35,7 @@ import org.apache.iceberg.types.Types;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
public class IcebergExternalTable extends ExternalTable {
|
||||
|
||||
@ -134,4 +137,11 @@ public class IcebergExternalTable extends ExternalTable {
|
||||
return tTableDescriptor;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ColumnStatistic> getColumnStatistic(String colName) {
|
||||
makeSureInitialized();
|
||||
return StatisticsUtil.getIcebergColumnStats(colName,
|
||||
((IcebergExternalCatalog) catalog).getIcebergTable(dbName, name));
|
||||
}
|
||||
}
|
||||
|
||||
@ -197,7 +197,7 @@ public final class HiveUtil {
|
||||
method = clazz.getDeclaredMethod("isSplitable", FileSystem.class, Path.class);
|
||||
break;
|
||||
} catch (NoSuchMethodException ignored) {
|
||||
LOG.warn("Class {} doesn't contain isSplitable method.", clazz);
|
||||
LOG.debug("Class {} doesn't contain isSplitable method.", clazz);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,41 +17,248 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.lang3.NotImplementedException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSAnalysisTask.class);
|
||||
|
||||
protected HMSExternalTable table;
|
||||
public static final String TOTAL_SIZE = "totalSize";
|
||||
public static final String NUM_ROWS = "numRows";
|
||||
public static final String NUM_FILES = "numFiles";
|
||||
public static final String TIMESTAMP = "transient_lastDdlTime";
|
||||
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "${partId} AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private final boolean isTableLevelTask;
|
||||
private final boolean isSamplingPartition;
|
||||
private final boolean isPartitionOnly;
|
||||
private final Set<String> partitionNames;
|
||||
private HMSExternalTable table;
|
||||
|
||||
public HMSAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
isTableLevelTask = info.externalTableLevelTask;
|
||||
isSamplingPartition = info.samplingPartition;
|
||||
isPartitionOnly = info.partitionOnly;
|
||||
partitionNames = info.partitionNames;
|
||||
table = (HMSExternalTable) tbl;
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect the column level stats for external table through metadata.
|
||||
*/
|
||||
protected void getStatsByMeta() throws Exception {
|
||||
throw new NotImplementedException("Code is not implemented");
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect the stats for external table through sql.
|
||||
* @return ColumnStatistics
|
||||
*/
|
||||
protected void getStatsBySql() throws Exception {
|
||||
throw new NotImplementedException("getColumnStatsBySql is not implemented");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() throws Exception {
|
||||
if (Config.collect_external_table_stats_by_sql) {
|
||||
getStatsBySql();
|
||||
if (isTableLevelTask) {
|
||||
getTableStats();
|
||||
} else {
|
||||
getStatsByMeta();
|
||||
getTableColumnStats();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get table row count and insert the result to __internal_schema.table_statistics
|
||||
*/
|
||||
private void getTableStats() throws Exception {
|
||||
// Get table level information. An example sql for table stats:
|
||||
// INSERT INTO __internal_schema.table_statistics VALUES
|
||||
// ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, NOW())
|
||||
Map<String, String> parameters = table.getRemoteTable().getParameters();
|
||||
if (isPartitionOnly) {
|
||||
for (String partId : partitionNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
|
||||
sb.append(" where ");
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String value = splits[i].split("=")[1];
|
||||
splits[i] = splits[i].replace(value, "\'" + value + "\'");
|
||||
}
|
||||
sb.append(StringUtils.join(splits, " and "));
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
setParameterData(parameters, params);
|
||||
List<InternalQueryResult.ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(sb.toString()));
|
||||
String rowCount = columnResult.get(0).getColumnValue("rowCount");
|
||||
params.put("rowCount", rowCount);
|
||||
StatisticsRepository.persistTableStats(params);
|
||||
}
|
||||
} else {
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
List<InternalQueryResult.ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
|
||||
String rowCount = columnResult.get(0).getColumnValue("rowCount");
|
||||
params.put("rowCount", rowCount);
|
||||
StatisticsRepository.persistTableStats(params);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column statistics and insert the result to __internal_schema.column_statistics
|
||||
*/
|
||||
private void getTableColumnStats() throws Exception {
|
||||
// An example sql for a column stats:
|
||||
// INSERT INTO __internal_schema.column_statistics
|
||||
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
|
||||
// 13002 AS catalog_id,
|
||||
// 13038 AS db_id,
|
||||
// 13055 AS tbl_id,
|
||||
// -1 AS idx_id,
|
||||
// 'r_regionkey' AS col_id,
|
||||
// 'NULL' AS part_id,
|
||||
// COUNT(1) AS row_count,
|
||||
// NDV(`r_regionkey`) AS ndv,
|
||||
// SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count,
|
||||
// MIN(`r_regionkey`) AS min,
|
||||
// MAX(`r_regionkey`) AS max,
|
||||
// 0 AS data_size,
|
||||
// NOW() FROM `hive`.`tpch100`.`region`
|
||||
if (isPartitionOnly) {
|
||||
for (String partId : partitionNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
sb.append(" where ");
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String value = splits[i].split("=")[1];
|
||||
splits[i] = splits[i].replace(value, "\'" + value + "\'");
|
||||
}
|
||||
sb.append(StringUtils.join(splits, " and "));
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
if (isSamplingPartition) {
|
||||
sb.append(" where 1=1 ");
|
||||
String[] splitExample = partitionNames.stream().findFirst().get().split("/");
|
||||
int parts = splitExample.length;
|
||||
List<String> partNames = new ArrayList<>();
|
||||
for (String split : splitExample) {
|
||||
partNames.add(split.split("=")[0]);
|
||||
}
|
||||
List<List<String>> valueLists = new ArrayList<>();
|
||||
for (int i = 0; i < parts; i++) {
|
||||
valueLists.add(new ArrayList<>());
|
||||
}
|
||||
for (String partId : partitionNames) {
|
||||
String[] partIds = partId.split("/");
|
||||
for (int i = 0; i < partIds.length; i++) {
|
||||
valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'");
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < parts; i++) {
|
||||
sb.append(" and ");
|
||||
sb.append(partNames.get(i));
|
||||
sb.append(" in (");
|
||||
sb.append(StringUtils.join(valueLists.get(i), ","));
|
||||
sb.append(") ");
|
||||
}
|
||||
}
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildTableStatsParams(String partId) {
|
||||
Map<String, String> commonParams = new HashMap<>();
|
||||
String id = StatisticsUtil.constructId(tbl.getId(), -1);
|
||||
if (!partId.equals("NULL")) {
|
||||
id = StatisticsUtil.constructId(id, partId);
|
||||
}
|
||||
commonParams.put("id", id);
|
||||
commonParams.put("catalogId", String.valueOf(catalog.getId()));
|
||||
commonParams.put("dbId", String.valueOf(db.getId()));
|
||||
commonParams.put("tblId", String.valueOf(tbl.getId()));
|
||||
commonParams.put("indexId", "-1");
|
||||
commonParams.put("idxId", "-1");
|
||||
commonParams.put("partId", "\'" + partId + "\'");
|
||||
commonParams.put("catalogName", catalog.getName());
|
||||
commonParams.put("dbName", db.getFullName());
|
||||
commonParams.put("tblName", tbl.getName());
|
||||
if (col != null) {
|
||||
commonParams.put("type", col.getType().toString());
|
||||
}
|
||||
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
|
||||
String numRows = "";
|
||||
String timestamp = "";
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
numRows = parameters.get(NUM_ROWS);
|
||||
}
|
||||
if (parameters.containsKey(TIMESTAMP)) {
|
||||
timestamp = parameters.get(TIMESTAMP);
|
||||
}
|
||||
params.put("numRows", numRows);
|
||||
params.put("rowCount", numRows);
|
||||
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
|
||||
ZoneId.systemDefault())));
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,370 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
|
||||
import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.Decimal;
|
||||
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
|
||||
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class HiveAnalysisTask extends HMSAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class);
|
||||
|
||||
public static final String TOTAL_SIZE = "totalSize";
|
||||
public static final String NUM_ROWS = "numRows";
|
||||
public static final String NUM_FILES = "numFiles";
|
||||
public static final String TIMESTAMP = "transient_lastDdlTime";
|
||||
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "${partId} AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
|
||||
private final boolean isTableLevelTask;
|
||||
private final boolean isSamplingPartition;
|
||||
private final boolean isPartitionOnly;
|
||||
private final Set<String> partitionNames;
|
||||
|
||||
public HiveAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
isTableLevelTask = info.externalTableLevelTask;
|
||||
isSamplingPartition = info.samplingPartition;
|
||||
isPartitionOnly = info.partitionOnly;
|
||||
partitionNames = info.partitionNames;
|
||||
}
|
||||
|
||||
private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, "
|
||||
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
|
||||
|
||||
private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', "
|
||||
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
|
||||
|
||||
private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, "
|
||||
+ "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
|
||||
|
||||
/**
|
||||
* Collect the stats for external table through sql.
|
||||
*/
|
||||
@Override
|
||||
protected void getStatsBySql() throws Exception {
|
||||
if (isTableLevelTask) {
|
||||
getTableStatsBySql();
|
||||
} else {
|
||||
getTableColumnStatsBySql();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get table row count and insert the result to __internal_schema.table_statistics
|
||||
*/
|
||||
private void getTableStatsBySql() throws Exception {
|
||||
// Get table level information. An example sql for table stats:
|
||||
// INSERT INTO __internal_schema.table_statistics VALUES
|
||||
// ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, NOW())
|
||||
Map<String, String> parameters = table.getRemoteTable().getParameters();
|
||||
if (isPartitionOnly) {
|
||||
for (String partId : partitionNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_TABLE_COUNT_TEMPLATE);
|
||||
sb.append(" where ");
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String value = splits[i].split("=")[1];
|
||||
splits[i] = splits[i].replace(value, "\'" + value + "\'");
|
||||
}
|
||||
sb.append(StringUtils.join(splits, " and "));
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
setParameterData(parameters, params);
|
||||
List<InternalQueryResult.ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(sb.toString()));
|
||||
String rowCount = columnResult.get(0).getColumnValue("rowCount");
|
||||
params.put("rowCount", rowCount);
|
||||
StatisticsRepository.persistTableStats(params);
|
||||
}
|
||||
} else {
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
List<InternalQueryResult.ResultRow> columnResult =
|
||||
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params)
|
||||
.replace(ANALYZE_TABLE_COUNT_TEMPLATE));
|
||||
String rowCount = columnResult.get(0).getColumnValue("rowCount");
|
||||
params.put("rowCount", rowCount);
|
||||
StatisticsRepository.persistTableStats(params);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get column statistics and insert the result to __internal_schema.column_statistics
|
||||
*/
|
||||
private void getTableColumnStatsBySql() throws Exception {
|
||||
// An example sql for a column stats:
|
||||
// INSERT INTO __internal_schema.column_statistics
|
||||
// SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id,
|
||||
// 13002 AS catalog_id,
|
||||
// 13038 AS db_id,
|
||||
// 13055 AS tbl_id,
|
||||
// -1 AS idx_id,
|
||||
// 'r_regionkey' AS col_id,
|
||||
// 'NULL' AS part_id,
|
||||
// COUNT(1) AS row_count,
|
||||
// NDV(`r_regionkey`) AS ndv,
|
||||
// SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count,
|
||||
// MIN(`r_regionkey`) AS min,
|
||||
// MAX(`r_regionkey`) AS max,
|
||||
// 0 AS data_size,
|
||||
// NOW() FROM `hive`.`tpch100`.`region`
|
||||
if (isPartitionOnly) {
|
||||
for (String partId : partitionNames) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
sb.append(" where ");
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String value = splits[i].split("=")[1];
|
||||
splits[i] = splits[i].replace(value, "\'" + value + "\'");
|
||||
}
|
||||
sb.append(StringUtils.join(splits, " and "));
|
||||
Map<String, String> params = buildTableStatsParams(partId);
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_SQL_TABLE_TEMPLATE);
|
||||
if (isSamplingPartition) {
|
||||
sb.append(" where 1=1 ");
|
||||
String[] splitExample = partitionNames.stream().findFirst().get().split("/");
|
||||
int parts = splitExample.length;
|
||||
List<String> partNames = new ArrayList<>();
|
||||
for (String split : splitExample) {
|
||||
partNames.add(split.split("=")[0]);
|
||||
}
|
||||
List<List<String>> valueLists = new ArrayList<>();
|
||||
for (int i = 0; i < parts; i++) {
|
||||
valueLists.add(new ArrayList<>());
|
||||
}
|
||||
for (String partId : partitionNames) {
|
||||
String[] partIds = partId.split("/");
|
||||
for (int i = 0; i < partIds.length; i++) {
|
||||
valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'");
|
||||
}
|
||||
}
|
||||
for (int i = 0; i < parts; i++) {
|
||||
sb.append(" and ");
|
||||
sb.append(partNames.get(i));
|
||||
sb.append(" in (");
|
||||
sb.append(StringUtils.join(valueLists.get(i), ","));
|
||||
sb.append(") ");
|
||||
}
|
||||
}
|
||||
Map<String, String> params = buildTableStatsParams("NULL");
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("colName", col.getName());
|
||||
params.put("colId", info.colName);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildTableStatsParams(String partId) {
|
||||
Map<String, String> commonParams = new HashMap<>();
|
||||
String id = StatisticsUtil.constructId(tbl.getId(), -1);
|
||||
if (!partId.equals("NULL")) {
|
||||
id = StatisticsUtil.constructId(id, partId);
|
||||
}
|
||||
commonParams.put("id", id);
|
||||
commonParams.put("catalogId", String.valueOf(catalog.getId()));
|
||||
commonParams.put("dbId", String.valueOf(db.getId()));
|
||||
commonParams.put("tblId", String.valueOf(tbl.getId()));
|
||||
commonParams.put("indexId", "-1");
|
||||
commonParams.put("idxId", "-1");
|
||||
commonParams.put("partId", "\'" + partId + "\'");
|
||||
commonParams.put("catalogName", catalog.getName());
|
||||
commonParams.put("dbName", db.getFullName());
|
||||
commonParams.put("tblName", tbl.getName());
|
||||
if (col != null) {
|
||||
commonParams.put("type", col.getType().toString());
|
||||
}
|
||||
commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis()));
|
||||
return commonParams;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void getStatsByMeta() throws Exception {
|
||||
// To be removed.
|
||||
}
|
||||
|
||||
private void getStatData(ColumnStatisticsData data, Map<String, String> params, long rowCount) {
|
||||
long ndv = 0;
|
||||
long nulls = 0;
|
||||
String min = "";
|
||||
String max = "";
|
||||
long colSize = 0;
|
||||
if (!data.isSetStringStats()) {
|
||||
colSize = rowCount * col.getType().getSlotSize();
|
||||
}
|
||||
// Collect ndv, nulls, min and max for different data type.
|
||||
if (data.isSetLongStats()) {
|
||||
LongColumnStatsData longStats = data.getLongStats();
|
||||
ndv = longStats.getNumDVs();
|
||||
nulls = longStats.getNumNulls();
|
||||
min = String.valueOf(longStats.getLowValue());
|
||||
max = String.valueOf(longStats.getHighValue());
|
||||
} else if (data.isSetStringStats()) {
|
||||
StringColumnStatsData stringStats = data.getStringStats();
|
||||
ndv = stringStats.getNumDVs();
|
||||
nulls = stringStats.getNumNulls();
|
||||
double avgColLen = stringStats.getAvgColLen();
|
||||
colSize = Math.round(avgColLen * rowCount);
|
||||
} else if (data.isSetDecimalStats()) {
|
||||
DecimalColumnStatsData decimalStats = data.getDecimalStats();
|
||||
ndv = decimalStats.getNumDVs();
|
||||
nulls = decimalStats.getNumNulls();
|
||||
if (decimalStats.isSetLowValue()) {
|
||||
Decimal lowValue = decimalStats.getLowValue();
|
||||
if (lowValue != null) {
|
||||
BigDecimal lowDecimal = new BigDecimal(new BigInteger(lowValue.getUnscaled()), lowValue.getScale());
|
||||
min = lowDecimal.toString();
|
||||
}
|
||||
}
|
||||
if (decimalStats.isSetHighValue()) {
|
||||
Decimal highValue = decimalStats.getHighValue();
|
||||
if (highValue != null) {
|
||||
BigDecimal highDecimal = new BigDecimal(
|
||||
new BigInteger(highValue.getUnscaled()), highValue.getScale());
|
||||
max = highDecimal.toString();
|
||||
}
|
||||
}
|
||||
} else if (data.isSetDoubleStats()) {
|
||||
DoubleColumnStatsData doubleStats = data.getDoubleStats();
|
||||
ndv = doubleStats.getNumDVs();
|
||||
nulls = doubleStats.getNumNulls();
|
||||
min = String.valueOf(doubleStats.getLowValue());
|
||||
max = String.valueOf(doubleStats.getHighValue());
|
||||
} else if (data.isSetDateStats()) {
|
||||
DateColumnStatsData dateStats = data.getDateStats();
|
||||
ndv = dateStats.getNumDVs();
|
||||
nulls = dateStats.getNumNulls();
|
||||
if (dateStats.isSetLowValue()) {
|
||||
org.apache.hadoop.hive.metastore.api.Date lowValue = dateStats.getLowValue();
|
||||
if (lowValue != null) {
|
||||
LocalDate lowDate = LocalDate.ofEpochDay(lowValue.getDaysSinceEpoch());
|
||||
min = lowDate.toString();
|
||||
}
|
||||
}
|
||||
if (dateStats.isSetHighValue()) {
|
||||
org.apache.hadoop.hive.metastore.api.Date highValue = dateStats.getHighValue();
|
||||
if (highValue != null) {
|
||||
LocalDate highDate = LocalDate.ofEpochDay(highValue.getDaysSinceEpoch());
|
||||
max = highDate.toString();
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new RuntimeException("Not supported data type.");
|
||||
}
|
||||
params.put("ndv", String.valueOf(ndv));
|
||||
params.put("nulls", String.valueOf(nulls));
|
||||
params.put("min", min);
|
||||
params.put("max", max);
|
||||
params.put("dataSize", String.valueOf(colSize));
|
||||
}
|
||||
|
||||
private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
|
||||
String numRows = "";
|
||||
String timestamp = "";
|
||||
if (parameters.containsKey(NUM_ROWS)) {
|
||||
numRows = parameters.get(NUM_ROWS);
|
||||
}
|
||||
if (parameters.containsKey(TIMESTAMP)) {
|
||||
timestamp = parameters.get(TIMESTAMP);
|
||||
}
|
||||
params.put("numRows", numRows);
|
||||
params.put("rowCount", numRows);
|
||||
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000),
|
||||
ZoneId.systemDefault())));
|
||||
}
|
||||
}
|
||||
@ -1,121 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.iceberg.DataFile;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.catalog.TableIdentifier;
|
||||
import org.apache.iceberg.types.Types;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class IcebergAnalysisTask extends HMSAnalysisTask {
|
||||
|
||||
private long numRows = 0;
|
||||
private long dataSize = 0;
|
||||
private long numNulls = 0;
|
||||
|
||||
public IcebergAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
}
|
||||
|
||||
private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, "
|
||||
+ "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, '${update_time}')";
|
||||
|
||||
|
||||
@Override
|
||||
protected void getStatsByMeta() throws Exception {
|
||||
Table icebergTable = getIcebergTable();
|
||||
TableScan tableScan = icebergTable.newScan().includeColumnStats();
|
||||
for (FileScanTask task : tableScan.planFiles()) {
|
||||
processDataFile(task.file(), task.spec());
|
||||
}
|
||||
updateStats();
|
||||
}
|
||||
|
||||
private Table getIcebergTable() {
|
||||
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
for (Map.Entry<String, String> entry : table.getHadoopProperties().entrySet()) {
|
||||
conf.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
hiveCatalog.setConf(conf);
|
||||
Map<String, String> catalogProperties = new HashMap<>();
|
||||
catalogProperties.put(HMSProperties.HIVE_METASTORE_URIS, table.getMetastoreUri());
|
||||
catalogProperties.put("uri", table.getMetastoreUri());
|
||||
hiveCatalog.initialize("hive", catalogProperties);
|
||||
return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
|
||||
}
|
||||
|
||||
private void processDataFile(DataFile dataFile, PartitionSpec partitionSpec) {
|
||||
int colId = -1;
|
||||
for (Types.NestedField column : partitionSpec.schema().columns()) {
|
||||
if (column.name().equals(col.getName())) {
|
||||
colId = column.fieldId();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (colId == -1) {
|
||||
throw new RuntimeException(String.format("Column %s not exist.", col.getName()));
|
||||
}
|
||||
dataSize += dataFile.columnSizes().get(colId);
|
||||
numRows += dataFile.recordCount();
|
||||
numNulls += dataFile.nullValueCounts().get(colId);
|
||||
}
|
||||
|
||||
private void updateStats() throws Exception {
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("id", tbl.getId() + "-" + col.getName());
|
||||
params.put("catalogId", String.valueOf(catalog.getId()));
|
||||
params.put("dbId", String.valueOf(db.getId()));
|
||||
params.put("tblId", String.valueOf(tbl.getId()));
|
||||
params.put("colId", String.valueOf(col.getName()));
|
||||
params.put("numRows", String.valueOf(numRows));
|
||||
params.put("nulls", String.valueOf(numNulls));
|
||||
params.put("dataSize", String.valueOf(dataSize));
|
||||
params.put("update_time", TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
|
||||
|
||||
// Update table level stats info of this column.
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE);
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
this.stmtExecutor.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -60,6 +60,7 @@ import org.apache.doris.qe.SessionVariable;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.ColumnStatistic;
|
||||
import org.apache.doris.statistics.ColumnStatisticBuilder;
|
||||
import org.apache.doris.statistics.Histogram;
|
||||
import org.apache.doris.statistics.StatisticConstants;
|
||||
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
|
||||
@ -71,9 +72,12 @@ import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.iceberg.DataFile;
|
||||
import org.apache.iceberg.FileScanTask;
|
||||
import org.apache.iceberg.PartitionSpec;
|
||||
import org.apache.iceberg.Table;
|
||||
import org.apache.iceberg.TableScan;
|
||||
import org.apache.iceberg.types.Types;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.thrift.TException;
|
||||
@ -594,4 +598,49 @@ public class StatisticsUtil {
|
||||
}
|
||||
return totalSize / estimatedRowSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Iceberg column statistics.
|
||||
* @param colName
|
||||
* @param table Iceberg table.
|
||||
* @return Optional Column statistic for the given column.
|
||||
*/
|
||||
public static Optional<ColumnStatistic> getIcebergColumnStats(String colName, org.apache.iceberg.Table table) {
|
||||
TableScan tableScan = table.newScan().includeColumnStats();
|
||||
ColumnStatisticBuilder columnStatisticBuilder = new ColumnStatisticBuilder();
|
||||
columnStatisticBuilder.setCount(0);
|
||||
columnStatisticBuilder.setMaxValue(Double.MAX_VALUE);
|
||||
columnStatisticBuilder.setMinValue(Double.MIN_VALUE);
|
||||
columnStatisticBuilder.setDataSize(0);
|
||||
columnStatisticBuilder.setAvgSizeByte(0);
|
||||
columnStatisticBuilder.setNumNulls(0);
|
||||
for (FileScanTask task : tableScan.planFiles()) {
|
||||
processDataFile(task.file(), task.spec(), colName, columnStatisticBuilder);
|
||||
}
|
||||
if (columnStatisticBuilder.getCount() > 0) {
|
||||
columnStatisticBuilder.setAvgSizeByte(columnStatisticBuilder.getDataSize()
|
||||
/ columnStatisticBuilder.getCount());
|
||||
}
|
||||
return Optional.of(columnStatisticBuilder.build());
|
||||
}
|
||||
|
||||
private static void processDataFile(DataFile dataFile, PartitionSpec partitionSpec,
|
||||
String colName, ColumnStatisticBuilder columnStatisticBuilder) {
|
||||
int colId = -1;
|
||||
for (Types.NestedField column : partitionSpec.schema().columns()) {
|
||||
if (column.name().equals(colName)) {
|
||||
colId = column.fieldId();
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (colId == -1) {
|
||||
throw new RuntimeException(String.format("Column %s not exist.", colName));
|
||||
}
|
||||
// Update the data size, count and num of nulls in columnStatisticBuilder.
|
||||
// TODO: Get min max value.
|
||||
columnStatisticBuilder.setDataSize(columnStatisticBuilder.getDataSize() + dataFile.columnSizes().get(colId));
|
||||
columnStatisticBuilder.setCount(columnStatisticBuilder.getCount() + dataFile.recordCount());
|
||||
columnStatisticBuilder.setNumNulls(columnStatisticBuilder.getNumNulls()
|
||||
+ dataFile.nullValueCounts().get(colId));
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user