diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java index e06de15db0..b1e42d343a 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -64,9 +64,11 @@ public final class FeMetaVersion { public static final int VERSION_121 = 121; // For IndexChangeJob public static final int VERSION_122 = 122; + // For AnalysisInfo + public static final int VERSION_123 = 123; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_122; + public static final int VERSION_CURRENT = VERSION_123; // all logs meta version should >= the minimum version, so that we could remove many if clause, for example // if (FE_METAVERSION < VERSION_94) ... diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index cf95aea327..37f187ac85 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2791,7 +2791,7 @@ show_create_reporitory_stmt ::= // analyze statment analyze_stmt ::= // statistics - KW_ANALYZE KW_TABLE table_name:tbl opt_col_list:cols + KW_ANALYZE KW_TABLE table_name:tbl opt_partition_names:partitions opt_col_list:cols opt_with_analysis_properties:withAnalysisProperties opt_properties:properties {: if (properties == null) { @@ -2805,7 +2805,7 @@ analyze_stmt ::= properties.put("analysis.type", "FUNDAMENTALS"); } AnalyzeProperties analyzeProperties= new AnalyzeProperties(properties); - RESULT = new AnalyzeTblStmt(tbl, cols, analyzeProperties); + RESULT = new AnalyzeTblStmt(tbl, partitions, cols, analyzeProperties); :} | KW_ANALYZE KW_DATABASE ident:ctlName DOT ident:dbName opt_with_analysis_properties:withAnalysisProperties opt_properties:properties diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java index 793902b327..01888461ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeTblStmt.java @@ -24,6 +24,7 @@ import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -47,12 +48,14 @@ import java.util.stream.Collectors; /** * Column Statistics Collection Syntax: * ANALYZE [ SYNC ] TABLE table_name + * [ PARTITIONS (partition_name [, ...])] * [ (column_name [, ...]) ] * [ [WITH SYNC] | [WITH INCREMENTAL] | [WITH SAMPLE PERCENT | ROWS ] ] * [ PROPERTIES ('key' = 'value', ...) ]; *

* Column histogram collection syntax: * ANALYZE [ SYNC ] TABLE table_name + * [ partitions (partition_name [, ...])] * [ (column_name [, ...]) ] * UPDATE HISTOGRAM * [ [ WITH SYNC ][ WITH INCREMENTAL ][ WITH SAMPLE PERCENT | ROWS ][ WITH BUCKETS ] ] @@ -64,6 +67,7 @@ import java.util.stream.Collectors; * - sample percent | rows:Collect statistics by sampling. Scale and number of rows can be sampled. * - buckets:Specifies the maximum number of buckets generated when collecting histogram statistics. * - table_name: The purpose table for collecting statistics. Can be of the form `db_name.table_name`. + * - partition_name: The specified destination partition must be a partition that exists in `table_name`, * - column_name: The specified destination column must be a column that exists in `table_name`, * and multiple column names are separated by commas. * - properties:Properties used to set statistics tasks. Currently only the following configurations @@ -79,16 +83,19 @@ public class AnalyzeTblStmt extends AnalyzeStmt { private final TableName tableName; private List columnNames; + private List partitionNames; // after analyzed private long dbId; private TableIf table; public AnalyzeTblStmt(TableName tableName, + PartitionNames partitionNames, List columnNames, AnalyzeProperties properties) { super(properties); this.tableName = tableName; + this.partitionNames = partitionNames == null ? null : partitionNames.getPartitionNames(); this.columnNames = columnNames; this.analyzeProperties = properties; } @@ -212,6 +219,30 @@ public class AnalyzeTblStmt extends AnalyzeStmt { .stream().map(Column::getName).collect(Collectors.toSet()) : Sets.newHashSet(columnNames); } + public Set getPartitionNames() { + Set partitions = partitionNames == null ? table.getPartitionNames() : Sets.newHashSet(partitionNames); + if (isSamplingPartition()) { + int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum(); + partitions = partitions.stream().limit(partNum).collect(Collectors.toSet()); + } + return partitions; + } + + public boolean isPartitionOnly() { + return partitionNames != null; + } + + public boolean isSamplingPartition() { + if (!(table instanceof HMSExternalTable) || partitionNames != null) { + return false; + } + int partNum = ConnectContext.get().getSessionVariable().getExternalTableAnalyzePartNum(); + if (partNum == -1 || partitionNames != null) { + return false; + } + return table instanceof HMSExternalTable && table.getPartitionNames().size() > partNum; + } + @Override public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_NO_SYNC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 9270ada4ee..98c941e987 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -18,18 +18,18 @@ package org.apache.doris.catalog.external; import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.catalog.HiveMetaStoreClientHelper; import org.apache.doris.catalog.HudiUtils; import org.apache.doris.catalog.Type; -import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient; import org.apache.doris.statistics.AnalysisInfo; import org.apache.doris.statistics.BaseAnalysisTask; -import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.HiveAnalysisTask; import org.apache.doris.statistics.IcebergAnalysisTask; +import org.apache.doris.statistics.StatisticsRepository; +import org.apache.doris.statistics.TableStatistic; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -47,6 +47,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -352,6 +353,14 @@ public class HMSExternalTable extends ExternalTable { return client.getPartition(dbName, name, partitionValues); } + @Override + public Set getPartitionNames() { + makeSureInitialized(); + PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient(); + List names = client.listPartitionNames(dbName, name); + return new HashSet<>(names); + } + @Override public List initSchema() { makeSureInitialized(); @@ -387,13 +396,11 @@ public class HMSExternalTable extends ExternalTable { @Override public long estimatedRowCount() { - ColumnStatistic cache = Config.enable_stats - ? Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(id, "") - : ColumnStatistic.UNKNOWN; - if (cache.isUnKnown) { + try { + TableStatistic tableStatistic = StatisticsRepository.fetchTableLevelStats(id); + return tableStatistic.rowCount; + } catch (DdlException e) { return 1; - } else { - return (long) cache.count; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 8fb2ac6763..e2897516f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -348,6 +348,8 @@ public class SessionVariable implements Serializable, Writable { public static final String IGNORE_COMPLEX_TYPE_COLUMN = "ignore_column_with_complex_type"; + public static final String EXTERNAL_TABLE_ANALYZE_PART_NUM = "external_table_analyze_part_num"; + public static final List DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -953,6 +955,14 @@ public class SessionVariable implements Serializable, Writable { needForward = true) public boolean enableOrcLazyMat = true; + @VariableMgr.VarAttr( + name = EXTERNAL_TABLE_ANALYZE_PART_NUM, + description = {"收集外表统计信息行数时选取的采样分区数,默认-1表示全部分区", + "Number of sample partition for collecting external table line number, " + + "default -1 means all partitions"}, + needForward = false) + public int externalTableAnalyzePartNum = -1; + @VariableMgr.VarAttr( name = INLINE_CTE_REFERENCED_THRESHOLD ) @@ -1899,6 +1909,10 @@ public class SessionVariable implements Serializable, Writable { return showUserDefaultRole; } + public int getExternalTableAnalyzePartNum() { + return externalTableAnalyzePartNum; + } + /** * Serialize to thrift object. * Used for rest api. diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java index dea2155199..5a00e3471e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfo.java @@ -17,12 +17,16 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; +import org.apache.doris.persist.gson.GsonUtils; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.gson.Gson; +import com.google.gson.annotations.SerializedName; import com.google.gson.reflect.TypeToken; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -34,7 +38,6 @@ import java.lang.reflect.Type; import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.StringJoiner; @@ -71,62 +74,94 @@ public class AnalysisInfo implements Writable { AUTOMATIC } + @SerializedName("jobId") public final long jobId; + @SerializedName("taskId") public final long taskId; + @SerializedName("catalogName") public final String catalogName; + @SerializedName("dbName") public final String dbName; + @SerializedName("tblName") public final String tblName; + @SerializedName("colToPartitions") public final Map> colToPartitions; + @SerializedName("partitionNames") + public final Set partitionNames; + + @SerializedName("colName") public final String colName; + @SerializedName("indexId") public final long indexId; + @SerializedName("jobType") public final JobType jobType; + @SerializedName("analysisMode") public final AnalysisMode analysisMode; + @SerializedName("analysisMethod") public final AnalysisMethod analysisMethod; + @SerializedName("analysisType") public final AnalysisType analysisType; + @SerializedName("samplePercent") public final int samplePercent; + @SerializedName("sampleRows") public final int sampleRows; + @SerializedName("maxBucketNum") public final int maxBucketNum; + @SerializedName("periodTimeInMs") public final long periodTimeInMs; // finished or failed + @SerializedName("lastExecTimeInMs") public long lastExecTimeInMs; + @SerializedName("state") public AnalysisState state; + @SerializedName("scheduleType") public final ScheduleType scheduleType; + @SerializedName("message") public String message; // True means this task is a table level task for external table. // This kind of task is mainly to collect the number of rows of a table. + @SerializedName("externalTableLevelTask") public boolean externalTableLevelTask; + @SerializedName("partitionOnly") + public boolean partitionOnly; + + @SerializedName("samplingPartition") + public boolean samplingPartition; + public AnalysisInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, - Map> colToPartitions, String colName, Long indexId, JobType jobType, - AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType, + Map> colToPartitions, Set partitionNames, String colName, Long indexId, + JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType, int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message, - long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask) { + long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask, + boolean partitionOnly, boolean samplingPartition) { this.jobId = jobId; this.taskId = taskId; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; this.colToPartitions = colToPartitions; + this.partitionNames = partitionNames; this.colName = colName; this.indexId = indexId; this.jobType = jobType; @@ -142,6 +177,8 @@ public class AnalysisInfo implements Writable { this.state = state; this.scheduleType = scheduleType; this.externalTableLevelTask = isExternalTableLevelTask; + this.partitionOnly = partitionOnly; + this.samplingPartition = samplingPartition; } @Override @@ -257,71 +294,50 @@ public class AnalysisInfo implements Writable { @Override public void write(DataOutput out) throws IOException { - out.writeLong(jobId); - out.writeLong(taskId); - Text.writeString(out, catalogName); - Text.writeString(out, dbName); - Text.writeString(out, tblName); - out.writeInt(colToPartitions.size()); - for (Entry> entry : colToPartitions.entrySet()) { - Text.writeString(out, entry.getKey()); - out.writeInt(entry.getValue().size()); - for (String part : entry.getValue()) { - Text.writeString(out, part); - } - } - Text.writeString(out, colName); - out.writeLong(indexId); - Text.writeString(out, jobType.toString()); - Text.writeString(out, analysisMode.toString()); - Text.writeString(out, analysisMethod.toString()); - Text.writeString(out, analysisType.toString()); - out.writeInt(samplePercent); - out.writeInt(sampleRows); - out.writeInt(maxBucketNum); - out.writeLong(periodTimeInMs); - out.writeLong(lastExecTimeInMs); - Text.writeString(out, state.toString()); - Text.writeString(out, scheduleType.toString()); - Text.writeString(out, message); - out.writeBoolean(externalTableLevelTask); + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); } public static AnalysisInfo read(DataInput dataInput) throws IOException { - AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); - analysisInfoBuilder.setJobId(dataInput.readLong()); - long taskId = dataInput.readLong(); - analysisInfoBuilder.setTaskId(taskId); - analysisInfoBuilder.setCatalogName(Text.readString(dataInput)); - analysisInfoBuilder.setDbName(Text.readString(dataInput)); - analysisInfoBuilder.setTblName(Text.readString(dataInput)); - int size = dataInput.readInt(); - Map> colToPartitions = new HashMap<>(); - for (int i = 0; i < size; i++) { - String k = Text.readString(dataInput); - int partSize = dataInput.readInt(); - Set parts = new HashSet<>(); - for (int j = 0; j < partSize; j++) { - parts.add(Text.readString(dataInput)); + if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_123) { + AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder(); + analysisInfoBuilder.setJobId(dataInput.readLong()); + long taskId = dataInput.readLong(); + analysisInfoBuilder.setTaskId(taskId); + analysisInfoBuilder.setCatalogName(Text.readString(dataInput)); + analysisInfoBuilder.setDbName(Text.readString(dataInput)); + analysisInfoBuilder.setTblName(Text.readString(dataInput)); + int size = dataInput.readInt(); + Map> colToPartitions = new HashMap<>(); + for (int i = 0; i < size; i++) { + String k = Text.readString(dataInput); + int partSize = dataInput.readInt(); + Set parts = new HashSet<>(); + for (int j = 0; j < partSize; j++) { + parts.add(Text.readString(dataInput)); + } + colToPartitions.put(k, parts); } - colToPartitions.put(k, parts); + analysisInfoBuilder.setColToPartitions(colToPartitions); + analysisInfoBuilder.setColName(Text.readString(dataInput)); + analysisInfoBuilder.setIndexId(dataInput.readLong()); + analysisInfoBuilder.setJobType(JobType.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setAnalysisMode(AnalysisMode.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setAnalysisType(AnalysisType.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setSamplePercent(dataInput.readInt()); + analysisInfoBuilder.setSampleRows(dataInput.readInt()); + analysisInfoBuilder.setMaxBucketNum(dataInput.readInt()); + analysisInfoBuilder.setPeriodTimeInMs(dataInput.readLong()); + analysisInfoBuilder.setLastExecTimeInMs(dataInput.readLong()); + analysisInfoBuilder.setState(AnalysisState.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setScheduleType(ScheduleType.valueOf(Text.readString(dataInput))); + analysisInfoBuilder.setMessage(Text.readString(dataInput)); + analysisInfoBuilder.setExternalTableLevelTask(dataInput.readBoolean()); + return analysisInfoBuilder.build(); + } else { + String json = Text.readString(dataInput); + return GsonUtils.GSON.fromJson(json, AnalysisInfo.class); } - analysisInfoBuilder.setColToPartitions(colToPartitions); - analysisInfoBuilder.setColName(Text.readString(dataInput)); - analysisInfoBuilder.setIndexId(dataInput.readLong()); - analysisInfoBuilder.setJobType(JobType.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setAnalysisMode(AnalysisMode.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setAnalysisType(AnalysisType.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setSamplePercent(dataInput.readInt()); - analysisInfoBuilder.setSampleRows(dataInput.readInt()); - analysisInfoBuilder.setMaxBucketNum(dataInput.readInt()); - analysisInfoBuilder.setPeriodTimeInMs(dataInput.readLong()); - analysisInfoBuilder.setLastExecTimeInMs(dataInput.readLong()); - analysisInfoBuilder.setState(AnalysisState.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setScheduleType(ScheduleType.valueOf(Text.readString(dataInput))); - analysisInfoBuilder.setMessage(Text.readString(dataInput)); - analysisInfoBuilder.setExternalTableLevelTask(dataInput.readBoolean()); - return analysisInfoBuilder.build(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java index c47b1dc7ab..a14f262edc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisInfoBuilder.java @@ -33,6 +33,7 @@ public class AnalysisInfoBuilder { private String dbName; private String tblName; private Map> colToPartitions; + private Set partitionNames; private String colName; private long indexId = -1L; private JobType jobType; @@ -48,6 +49,8 @@ public class AnalysisInfoBuilder { private ScheduleType scheduleType; private String message = ""; private boolean externalTableLevelTask; + private boolean partitionOnly; + private boolean samplingPartition; public AnalysisInfoBuilder() { } @@ -59,6 +62,7 @@ public class AnalysisInfoBuilder { dbName = info.dbName; tblName = info.tblName; colToPartitions = info.colToPartitions; + partitionNames = info.partitionNames; colName = info.colName; indexId = info.indexId; jobType = info.jobType; @@ -73,6 +77,9 @@ public class AnalysisInfoBuilder { lastExecTimeInMs = info.lastExecTimeInMs; state = info.state; scheduleType = info.scheduleType; + externalTableLevelTask = info.externalTableLevelTask; + partitionOnly = info.partitionOnly; + samplingPartition = info.samplingPartition; } public AnalysisInfoBuilder setJobId(long jobId) { @@ -110,6 +117,11 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setPartitionNames(Set partitionNames) { + this.partitionNames = partitionNames; + return this; + } + public AnalysisInfoBuilder setIndexId(Long indexId) { this.indexId = indexId; return this; @@ -180,11 +192,21 @@ public class AnalysisInfoBuilder { return this; } + public AnalysisInfoBuilder setPartitionOnly(boolean isPartitionOnly) { + this.partitionOnly = isPartitionOnly; + return this; + } + + public AnalysisInfoBuilder setSamplingPartition(boolean samplingPartition) { + this.samplingPartition = samplingPartition; + return this; + } + public AnalysisInfo build() { - return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, + return new AnalysisInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions, partitionNames, colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent, sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType, - externalTableLevelTask); + externalTableLevelTask, partitionOnly, samplingPartition); } public AnalysisInfoBuilder copy() { @@ -209,6 +231,6 @@ public class AnalysisInfoBuilder { .setLastExecTimeInMs(lastExecTimeInMs) .setState(state) .setScheduleType(scheduleType) - .setExternalTableLevelTask(false); + .setExternalTableLevelTask(externalTableLevelTask); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java index 789478d501..18fcb8d82e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.View; +import org.apache.doris.catalog.external.HMSExternalTable; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; @@ -296,9 +297,8 @@ public class AnalysisManager extends Daemon implements Writable { * TODO Supports incremental collection of statistics from materialized views */ private Map> validateAndGetPartitions(TableIf table, Set columnNames, - AnalysisType analysisType, AnalysisMode analysisMode) throws DdlException { + Set partitionNames, AnalysisType analysisType, AnalysisMode analysisMode) throws DdlException { long tableId = table.getId(); - Set partitionNames = table.getPartitionNames(); Map> columnToPartitions = columnNames.stream() .collect(Collectors.toMap( @@ -312,6 +312,13 @@ public class AnalysisManager extends Daemon implements Writable { return columnToPartitions; } + if (table instanceof HMSExternalTable) { + // TODO Currently, we do not support INCREMENTAL collection for external table. + // One reason is external table partition id couldn't convert to a Long value. + // Will solve this problem later. + return columnToPartitions; + } + // Get the partition granularity statistics that have been collected Map> existColAndPartsForStats = StatisticsRepository .fetchColAndPartsForStats(tableId); @@ -365,6 +372,9 @@ public class AnalysisManager extends Daemon implements Writable { String tblName = tbl.getTbl(); TableIf table = stmt.getTable(); Set columnNames = stmt.getColumnNames(); + Set partitionNames = stmt.getPartitionNames(); + boolean partitionOnly = stmt.isPartitionOnly(); + boolean isSamplingPartition = stmt.isSamplingPartition(); int samplePercent = stmt.getSamplePercent(); int sampleRows = stmt.getSampleRows(); AnalysisType analysisType = stmt.getAnalysisType(); @@ -381,6 +391,9 @@ public class AnalysisManager extends Daemon implements Writable { stringJoiner.add(colName); } taskInfoBuilder.setColName(stringJoiner.toString()); + taskInfoBuilder.setPartitionNames(partitionNames); + taskInfoBuilder.setPartitionOnly(partitionOnly); + taskInfoBuilder.setSamplingPartition(isSamplingPartition); taskInfoBuilder.setJobType(JobType.MANUAL); taskInfoBuilder.setState(AnalysisState.PENDING); taskInfoBuilder.setAnalysisType(analysisType); @@ -406,8 +419,8 @@ public class AnalysisManager extends Daemon implements Writable { taskInfoBuilder.setPeriodTimeInMs(periodTimeInMs); } - Map> colToPartitions = validateAndGetPartitions(table, - columnNames, analysisType, analysisMode); + Map> colToPartitions = validateAndGetPartitions(table, columnNames, + partitionNames, analysisType, analysisMode); taskInfoBuilder.setColToPartitions(colToPartitions); return taskInfoBuilder.build(); @@ -433,8 +446,8 @@ public class AnalysisManager extends Daemon implements Writable { try { TableIf table = StatisticsUtil .findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName); - Map> colToPartitions = validateAndGetPartitions(table, - jobInfo.colToPartitions.keySet(), jobInfo.analysisType, jobInfo.analysisMode); + Map> colToPartitions = validateAndGetPartitions(table, jobInfo.colToPartitions.keySet(), + jobInfo.partitionNames, jobInfo.analysisType, jobInfo.analysisMode); taskInfoBuilder.setColToPartitions(colToPartitions); } catch (Throwable e) { throw new RuntimeException(e); @@ -547,6 +560,10 @@ public class AnalysisManager extends Daemon implements Writable { AnalysisInfo analysisInfo = colTaskInfoBuilder.setIndexId(-1L) .setTaskId(taskId).setExternalTableLevelTask(true).build(); analysisTasks.put(taskId, createTask(analysisInfo)); + if (isSync) { + // For sync job, don't need to persist, return here and execute it immediately. + return; + } try { logCreateAnalysisJob(analysisInfo); } catch (Exception e) { @@ -604,7 +621,8 @@ public class AnalysisManager extends Daemon implements Writable { updateOlapTableStats(table, params); } - // TODO support external table + // External Table doesn't collect table stats here. + // We create task for external table to collect table/partition level statistics. } @SuppressWarnings("rawtypes") diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java index 5735c01430..8aecb8bb4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/ColumnStatistic.java @@ -147,9 +147,10 @@ public class ColumnStatistic { String colName = resultRow.getColumnValue("col_id"); Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName); if (col == null) { - // Col is null indicates this information is external table level info, - // which doesn't have a column. - return columnStatisticBuilder.build(); + LOG.warn("Failed to deserialize column statistics, ctlId: {} dbId: {}" + + "tblId: {} column: {} not exists", + catalogId, dbID, tblId, colName); + return ColumnStatistic.UNKNOWN; } String min = resultRow.getColumnValue("min"); String max = resultRow.getColumnValue("max"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 0e358857ca..be1fd516af 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -20,13 +20,14 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.StmtExecutor; import org.apache.doris.statistics.util.InternalQueryResult; import org.apache.doris.statistics.util.StatisticsUtil; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; @@ -50,6 +51,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class HiveAnalysisTask extends HMSAnalysisTask { private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class); @@ -83,10 +85,16 @@ public class HiveAnalysisTask extends HMSAnalysisTask { + "FROM `${catalogName}`.`${dbName}`.`${tblName}`"; private final boolean isTableLevelTask; + private final boolean isSamplingPartition; + private final boolean isPartitionOnly; + private final Set partitionNames; public HiveAnalysisTask(AnalysisInfo info) { super(info); isTableLevelTask = info.externalTableLevelTask; + isSamplingPartition = info.samplingPartition; + isPartitionOnly = info.partitionOnly; + partitionNames = info.partitionNames; } private static final String ANALYZE_META_TABLE_COLUMN_TEMPLATE = "INSERT INTO " @@ -109,54 +117,144 @@ public class HiveAnalysisTask extends HMSAnalysisTask { */ @Override protected void getStatsBySql() throws Exception { - getTableStatsBySql(); - getPartitionStatsBySql(); - getTableColumnStatsBySql(); - getPartitionColumnStatsBySql(); + if (isTableLevelTask) { + getTableStatsBySql(); + } else { + getTableColumnStatsBySql(); + } } /** * Get table row count and insert the result to __internal_schema.table_statistics */ private void getTableStatsBySql() throws Exception { - Map params = buildTableStatsParams(); - List columnResult = - StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) - .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); - String rowCount = columnResult.get(0).getColumnValue("rowCount"); - params.put("rowCount", rowCount); - StatisticsRepository.persistTableStats(params); + // Get table level information. An example sql for table stats: + // INSERT INTO __internal_schema.table_statistics VALUES + // ('13055', 13002, 13038, 13055, -1, 'NULL', 5, 1686111064658, NOW()) + Map parameters = table.getRemoteTable().getParameters(); + if (isPartitionOnly) { + for (String partId : partitionNames) { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_TABLE_COUNT_TEMPLATE); + sb.append(" where "); + String[] splits = partId.split("/"); + for (int i = 0; i < splits.length; i++) { + String value = splits[i].split("=")[1]; + splits[i] = splits[i].replace(value, "\'" + value + "\'"); + } + sb.append(StringUtils.join(splits, " and ")); + Map params = buildTableStatsParams(partId); + setParameterData(parameters, params); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(sb.toString())); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } + } else { + Map params = buildTableStatsParams("NULL"); + List columnResult = + StatisticsUtil.execStatisticQuery(new StringSubstitutor(params) + .replace(ANALYZE_TABLE_COUNT_TEMPLATE)); + String rowCount = columnResult.get(0).getColumnValue("rowCount"); + params.put("rowCount", rowCount); + StatisticsRepository.persistTableStats(params); + } } /** * Get column statistics and insert the result to __internal_schema.column_statistics */ private void getTableColumnStatsBySql() throws Exception { - Map params = buildTableStatsParams(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("colName", col.getName()); - params.put("colId", info.colName); - params.put("dataSizeFunction", getDataSizeFunction(col)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_SQL_TABLE_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); + // An example sql for a column stats: + // INSERT INTO __internal_schema.column_statistics + // SELECT CONCAT(13055, '-', -1, '-', 'r_regionkey') AS id, + // 13002 AS catalog_id, + // 13038 AS db_id, + // 13055 AS tbl_id, + // -1 AS idx_id, + // 'r_regionkey' AS col_id, + // 'NULL' AS part_id, + // COUNT(1) AS row_count, + // NDV(`r_regionkey`) AS ndv, + // SUM(CASE WHEN `r_regionkey` IS NULL THEN 1 ELSE 0 END) AS null_count, + // MIN(`r_regionkey`) AS min, + // MAX(`r_regionkey`) AS max, + // 0 AS data_size, + // NOW() FROM `hive`.`tpch100`.`region` + if (isPartitionOnly) { + for (String partId : partitionNames) { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_SQL_TABLE_TEMPLATE); + sb.append(" where "); + String[] splits = partId.split("/"); + for (int i = 0; i < splits.length; i++) { + String value = splits[i].split("=")[1]; + splits[i] = splits[i].replace(value, "\'" + value + "\'"); + } + sb.append(StringUtils.join(splits, " and ")); + Map params = buildTableStatsParams(partId); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + } + } else { + StringBuilder sb = new StringBuilder(); + sb.append(ANALYZE_SQL_TABLE_TEMPLATE); + if (isSamplingPartition) { + sb.append(" where 1=1 "); + String[] splitExample = partitionNames.stream().findFirst().get().split("/"); + int parts = splitExample.length; + List partNames = new ArrayList<>(); + for (String split : splitExample) { + partNames.add(split.split("=")[0]); + } + List> valueLists = new ArrayList<>(); + for (int i = 0; i < parts; i++) { + valueLists.add(new ArrayList<>()); + } + for (String partId : partitionNames) { + String[] partIds = partId.split("/"); + for (int i = 0; i < partIds.length; i++) { + valueLists.get(i).add("\'" + partIds[i].split("=")[1] + "\'"); + } + } + for (int i = 0; i < parts; i++) { + sb.append(" and "); + sb.append(partNames.get(i)); + sb.append(" in ("); + sb.append(StringUtils.join(valueLists.get(i), ",")); + sb.append(") "); + } + } + Map params = buildTableStatsParams("NULL"); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("colName", col.getName()); + params.put("colId", info.colName); + params.put("dataSizeFunction", getDataSizeFunction(col)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(sb.toString()); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); } - Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName()); } - private void getPartitionStatsBySql() { - // TODO: Collect partition stats by sql. - } - - private void getPartitionColumnStatsBySql() { - // TODO: Collect partition column stats by sql. - } - - private Map buildTableStatsParams() { + private Map buildTableStatsParams(String partId) { Map commonParams = new HashMap<>(); commonParams.put("id", String.valueOf(tbl.getId())); commonParams.put("catalogId", String.valueOf(catalog.getId())); @@ -164,11 +262,13 @@ public class HiveAnalysisTask extends HMSAnalysisTask { commonParams.put("tblId", String.valueOf(tbl.getId())); commonParams.put("indexId", "-1"); commonParams.put("idxId", "-1"); - commonParams.put("partId", "NULL"); + commonParams.put("partId", "\'" + partId + "\'"); commonParams.put("catalogName", catalog.getName()); commonParams.put("dbName", db.getFullName()); commonParams.put("tblName", tbl.getName()); - commonParams.put("type", col.getType().toString()); + if (col != null) { + commonParams.put("type", col.getType().toString()); + } commonParams.put("lastAnalyzeTimeInMs", String.valueOf(System.currentTimeMillis())); return commonParams; } @@ -183,28 +283,20 @@ public class HiveAnalysisTask extends HMSAnalysisTask { } protected void getTableStatsByMeta() throws Exception { - Map params = new HashMap<>(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("colId", ""); - // Get table level information. Map parameters = table.getRemoteTable().getParameters(); - // Collect table level row count, null number and timestamp. - setParameterData(parameters, params); - if (parameters.containsKey(TOTAL_SIZE)) { - params.put("dataSize", parameters.get(TOTAL_SIZE)); - } - params.put("id", genColumnStatId(tbl.getId(), -1, "", null)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); + if (isPartitionOnly) { + for (String partId : partitionNames) { + Map params = buildTableStatsParams(partId); + // Collect table level row count, null number and timestamp. + setParameterData(parameters, params); + StatisticsRepository.persistTableStats(params); + } + } else { + Map params = buildTableStatsParams("NULL"); + // Collect table level row count, null number and timestamp. + setParameterData(parameters, params); + StatisticsRepository.persistTableStats(params); } } @@ -221,31 +313,34 @@ public class HiveAnalysisTask extends HMSAnalysisTask { // Get table level information. Map parameters = table.getRemoteTable().getParameters(); - // Collect table level row count, null number and timestamp. - setParameterData(parameters, params); - params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(), null)); - List tableStats = table.getHiveTableColumnStats(columns); - long rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0; - // Collect table level ndv, nulls, min and max. tableStats contains at most 1 item; - for (ColumnStatisticsObj tableStat : tableStats) { - if (!tableStat.isSetStatsData()) { - continue; + long rowCount; + StringSubstitutor stringSubstitutor; + if (isPartitionOnly) { + // Collect table level row count, null number and timestamp. + setParameterData(parameters, params); + params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(), null)); + List tableStats = table.getHiveTableColumnStats(columns); + rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0; + // Collect table level ndv, nulls, min and max. tableStats contains at most 1 item; + for (ColumnStatisticsObj tableStat : tableStats) { + if (!tableStat.isSetStatsData()) { + continue; + } + ColumnStatisticsData data = tableStat.getStatsData(); + getStatData(data, params, rowCount); + } + stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_COLUMN_TEMPLATE); + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); } - ColumnStatisticsData data = tableStat.getStatsData(); - getStatData(data, params, rowCount); - } - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_META_TABLE_COLUMN_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - r.connectContext.getSessionVariable().disableNereidsPlannerOnce(); - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); } // Get partition level information. - List partitions = ((HMSExternalCatalog) - catalog).getClient().listPartitionNames(db.getFullName(), table.getName()); - Map> columnStats = table.getHivePartitionColumnStats(partitions, columns); + Map> columnStats + = table.getHivePartitionColumnStats(Lists.newArrayList(partitionNames), columns); List partitionAnalysisSQLs = new ArrayList<>(); for (Map.Entry> entry : columnStats.entrySet()) { String partName = entry.getKey(); @@ -368,6 +463,7 @@ public class HiveAnalysisTask extends HMSAnalysisTask { timestamp = parameters.get(TIMESTAMP); } params.put("numRows", numRows); + params.put("rowCount", numRows); params.put("update_time", TimeUtils.DATETIME_FORMAT.format( LocalDateTime.ofInstant(Instant.ofEpochMilli(Long.parseLong(timestamp) * 1000), ZoneId.systemDefault())));