From 8a8d3bcb59284bbd82f48a7152d3ecd73534d23f Mon Sep 17 00:00:00 2001 From: Jibing-Li <64681310+Jibing-Li@users.noreply.github.com> Date: Fri, 26 May 2023 10:31:02 +0800 Subject: [PATCH] [improvement](multi catalog, nereids)Support collect hive table statistics by sql (#19955) Support collect hive external table statistics by running sql against hive table. By running sql, we could collect all the statistics collected for Olap table, including the min, max value of String column. With 3 BE (16 core, 64 GB), it cost less than 2 minutes to collect TPCH 100GB statistics for all columns of all tables. Also less than 2 minutes to collect all columns statistics for SSB 100GB tables. --- .../doris/statistics/HMSAnalysisTask.java | 2 +- .../doris/statistics/HiveAnalysisTask.java | 104 +++++++++++++++++- 2 files changed, 99 insertions(+), 7 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 8bfaa1ca8a..bb3c073002 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -42,7 +42,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { * Collect the stats for external table through sql. * @return ColumnStatistics */ - protected void getStatsBySql() { + protected void getStatsBySql() throws Exception { throw new NotImplementedException("getColumnStatsBySql is not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 305712bc6d..d9469c0024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -23,6 +23,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.InternalQueryResult; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.base.Preconditions; @@ -59,6 +60,28 @@ public class HiveAnalysisTask extends HMSAnalysisTask { public static final String TIMESTAMP = "transient_lastDdlTime"; public static final String DELIMITER = "-"; + private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "${idxId} AS idx_id, " + + "'${colId}' AS col_id, " + + "${partId} AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + private final boolean isTableLevelTask; public HiveAnalysisTask(AnalysisTaskInfo info) { @@ -66,21 +89,90 @@ public class HiveAnalysisTask extends HMSAnalysisTask { isTableLevelTask = info.externalTableLevelTask; } - private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', " + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, " + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')"; + /** + * Collect the stats for external table through sql. + */ + @Override + protected void getStatsBySql() throws Exception { + getTableStatsBySql(); + getPartitionStatsBySql(); + getTableColumnStatsBySql(); + getPartitionColumnStatsBySql(); + } + + /** + * Get table row count and insert the result to __internal_schema.table_statistics + */ + private void getTableStatsBySql() throws Exception { + Map params = buildTableStatsParams(); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } + + /** + * Get column statistics and insert the result to __internal_schema.column_statistics + */ + private void getTableColumnStatsBySql() throws Exception { + Map params = buildTableStatsParams(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_SQL_TABLE_TEMPLATE); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); + } + + private void getPartitionStatsBySql() { + // TODO: Collect partition stats by sql. + } + + private void getPartitionColumnStatsBySql() { + // TODO: Collect partition column stats by sql. + } + + private Map buildTableStatsParams() { + Map commonParams = new HashMap<>(); + commonParams.put("id", String.valueOf(tbl.getId())); + commonParams.put("catalogId", String.valueOf(catalog.getId())); + commonParams.put("dbId", String.valueOf(db.getId())); + commonParams.put("tblId", String.valueOf(tbl.getId())); + commonParams.put("indexId", "-1"); + commonParams.put("idxId", "-1"); + commonParams.put("partId", "NULL"); + commonParams.put("catalogName", catalog.getName()); + commonParams.put("dbName", db.getFullName()); + commonParams.put("tblName", tbl.getName()); + commonParams.put("type", col.getType().toString()); + commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + return commonParams; + } + @Override protected void getStatsByMeta() throws Exception { if (isTableLevelTask) { @@ -108,7 +200,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { } params.put("id", genColumnStatId(tbl.getId(), -1, "", null)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); + String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_TEMPLATE); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); this.stmtExecutor = new StmtExecutor(r.connectContext, sql); @@ -143,7 +235,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { getStatData(data, params, rowCount); } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE); + String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_COLUMN_TEMPLATE); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); this.stmtExecutor = new StmtExecutor(r.connectContext, sql); @@ -178,7 +270,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { ColumnStatisticsData data = stat.getStatsData(); getStatData(data, params, rowCount); stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE)); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_META_PARTITION_COLUMN_TEMPLATE)); } // Update partition level stats for this column. for (String partitionSql : partitionAnalysisSQLs) {