diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index 8bfaa1ca8a..bb3c073002 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -42,7 +42,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask { * Collect the stats for external table through sql. * @return ColumnStatistics */ - protected void getStatsBySql() { + protected void getStatsBySql() throws Exception { throw new NotImplementedException("getColumnStatsBySql is not implemented"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 305712bc6d..d9469c0024 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -23,6 +23,7 @@ import org.apache.doris.common.util.TimeUtils; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.InternalQueryResult; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.base.Preconditions; @@ -59,6 +60,28 @@ public class HiveAnalysisTask extends HMSAnalysisTask { public static final String TIMESTAMP = "transient_lastDdlTime"; public static final String DELIMITER = "-"; + private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "${idxId} AS idx_id, " + + "'${colId}' AS col_id, " + + "${partId} AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + + private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT COUNT(1) as rowCount " + + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; + private final boolean isTableLevelTask; public HiveAnalysisTask(AnalysisTaskInfo info) { @@ -66,21 +89,90 @@ public class HiveAnalysisTask extends HMSAnalysisTask { isTableLevelTask = info.externalTableLevelTask; } - private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, " + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_PARTITION_COLUMN_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', " + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; - private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO " + private static final String ANALYZE_META_TABLE_TEMPLATE = "INSERT INTO " + "${internalDB}.${columnStatTbl}" + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, " + "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')"; + /** + * Collect the stats for external table through sql. + */ + @Override + protected void getStatsBySql() throws Exception { + getTableStatsBySql(); + getPartitionStatsBySql(); + getTableColumnStatsBySql(); + getPartitionColumnStatsBySql(); + } + + /** + * Get table row count and insert the result to __internal_schema.table_statistics + */ + private void getTableStatsBySql() throws Exception { + Map params = buildTableStatsParams(); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } + + /** + * Get column statistics and insert the result to __internal_schema.column_statistics + */ + private void getTableColumnStatsBySql() throws Exception { + Map params = buildTableStatsParams(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_SQL_TABLE_TEMPLATE); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); + } + + private void getPartitionStatsBySql() { + // TODO: Collect partition stats by sql. + } + + private void getPartitionColumnStatsBySql() { + // TODO: Collect partition column stats by sql. + } + + private Map buildTableStatsParams() { + Map commonParams = new HashMap<>(); + commonParams.put("id", String.valueOf(tbl.getId())); + commonParams.put("catalogId", String.valueOf(catalog.getId())); + commonParams.put("dbId", String.valueOf(db.getId())); + commonParams.put("tblId", String.valueOf(tbl.getId())); + commonParams.put("indexId", "-1"); + commonParams.put("idxId", "-1"); + commonParams.put("partId", "NULL"); + commonParams.put("catalogName", catalog.getName()); + commonParams.put("dbName", db.getFullName()); + commonParams.put("tblName", tbl.getName()); + commonParams.put("type", col.getType().toString()); + commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); + return commonParams; + } + @Override protected void getStatsByMeta() throws Exception { if (isTableLevelTask) { @@ -108,7 +200,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { } params.put("id", genColumnStatId(tbl.getId(), -1, "", null)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); + String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_TEMPLATE); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); this.stmtExecutor = new StmtExecutor(r.connectContext, sql); @@ -143,7 +235,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { getStatData(data, params, rowCount); } StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE); + String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_COLUMN_TEMPLATE); try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); this.stmtExecutor = new StmtExecutor(r.connectContext, sql); @@ -178,7 +270,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { ColumnStatisticsData data = stat.getStatsData(); getStatData(data, params, rowCount); stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE)); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_META_PARTITION_COLUMN_TEMPLATE)); } // Update partition level stats for this column. for (String partitionSql : partitionAnalysisSQLs) {