diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index 446de34858..f99e172c56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -19,10 +19,12 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.ErrorCode; @@ -31,6 +33,7 @@ import org.apache.doris.common.FeNameFormat; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.Util; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; @@ -202,17 +205,13 @@ public class AnalyzeStmt extends DdlStmt { if (optTableName != null) { optTableName.analyze(analyzer); - // disallow external catalog - Util.prohibitExternalCatalog(optTableName.getCtl(), - this.getClass().getSimpleName()); - + String catalogName = optTableName.getCtl(); String dbName = optTableName.getDb(); String tblName = optTableName.getTbl(); - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName); - Table table = db.getTableOrAnalysisException(tblName); + CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName); + DatabaseIf db = catalog.getDbOrAnalysisException(dbName); + TableIf table = db.getTableOrAnalysisException(tblName); - // external table is not supported - checkAnalyzeType(table); checkAnalyzePriv(dbName, tblName); if (optColumnNames != null && !optColumnNames.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index f719dea404..03f60633fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -46,6 +46,9 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; +import org.apache.doris.statistics.AnalysisJob; +import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisJobScheduler; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TCompressionType; @@ -976,6 +979,11 @@ public class OlapTable extends Table { return tTableDescriptor; } + @Override + public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + return new AnalysisJob(scheduler, info); + } + @Override public long getRowCount() { long rowCount = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 047f30dee5..c45feb7a57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -26,6 +26,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.external.hudi.HudiTable; +import org.apache.doris.statistics.AnalysisJob; +import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisJobScheduler; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.base.Preconditions; @@ -503,4 +506,9 @@ public abstract class Table extends MetaObject implements Writable, TableIf { public Set getPartitionNames() { return Collections.EMPTY_SET; } + + @Override + public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + throw new NotImplementedException(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 24a93c2df8..9b831bb524 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -20,6 +20,9 @@ package org.apache.doris.catalog; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.statistics.AnalysisJob; +import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisJobScheduler; import org.apache.doris.thrift.TTableDescriptor; import java.util.Collections; @@ -108,6 +111,8 @@ public interface TableIf { TTableDescriptor toThrift(); + AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info); + /** * Doris table type. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index e97c5b1a26..eeca74ff5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -29,6 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; +import org.apache.doris.statistics.AnalysisJob; +import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisJobScheduler; import org.apache.doris.thrift.TTableDescriptor; import com.google.gson.annotations.SerializedName; @@ -297,6 +300,11 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { return null; } + @Override + public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + throw new NotImplementedException(); + } + @Override public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index 761ca278d3..42a1091252 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -21,13 +21,21 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Type; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.datasource.PooledHiveMetaStoreClient; +import org.apache.doris.statistics.AnalysisJob; +import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.HiveAnalysisJob; +import org.apache.doris.statistics.IcebergAnalysisJob; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -266,6 +274,19 @@ public class HMSExternalTable extends ExternalTable { return tTableDescriptor; } + @Override + public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + makeSureInitialized(); + switch (dlaType) { + case HIVE: + return new HiveAnalysisJob(scheduler, info); + case ICEBERG: + return new IcebergAnalysisJob(scheduler, info); + default: + throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported."); + } + } + public String getMetastoreUri() { return ((HMSExternalCatalog) catalog).getHiveMetastoreUris(); } @@ -277,5 +298,21 @@ public class HMSExternalTable extends ExternalTable { public Map getS3Properties() { return catalog.getCatalogProperty().getS3Properties(); } + + public List getHiveTableColumnStats(List columns) { + PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient(); + return client.getTableColumnStatistics(dbName, name, columns); + } + + public Map> getHivePartitionColumnStats( + List partNames, List columns) { + PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient(); + return client.getPartitionColumnStatistics(dbName, name, partNames, columns); + } + + public Partition getPartition(List partitionValues) { + PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient(); + return client.getPartition(dbName, name, partitionValues); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 49da79a33a..75f9651ce7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1909,5 +1909,12 @@ public class Config extends ConfigBase { */ @ConfField(mutable = true, masterOnly = false) public static boolean use_fuzzy_session_variable = false; + + /** + * Collect external table statistic info by running sql when set to true. + * Otherwise, use external catalog metadata. + */ + @ConfField(mutable = true) + public static boolean collect_external_table_stats_by_sql = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java index be7e54eba1..6698022184 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/PooledHiveMetaStoreClient.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaHookLoader; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -37,6 +38,7 @@ import org.apache.logging.log4j.Logger; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Queue; /** @@ -126,6 +128,23 @@ public class PooledHiveMetaStoreClient { } } + public List getTableColumnStatistics(String dbName, String tblName, List columns) { + try (CachedClient client = getClient()) { + return client.client.getTableColumnStatistics(dbName, tblName, columns); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Map> getPartitionColumnStatistics( + String dbName, String tblName, List partNames, List columns) { + try (CachedClient client = getClient()) { + return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private class CachedClient implements AutoCloseable { private final IMetaStoreClient client; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java index 83ac6e11a9..880a91580a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java @@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.qe.ConnectContext; @@ -40,17 +41,17 @@ public class AnalysisJob { private final AnalysisJobScheduler analysisJobScheduler; - private final AnalysisJobInfo info; + protected final AnalysisJobInfo info; - private CatalogIf catalog; + protected CatalogIf catalog; - private DatabaseIf db; + protected DatabaseIf db; - private TableIf tbl; + protected TableIf tbl; - private Column col; + protected Column col; - private StmtExecutor stmtExecutor; + protected StmtExecutor stmtExecutor; public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { this.analysisJobScheduler = analysisJobScheduler; @@ -65,13 +66,13 @@ public class AnalysisJob { String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis()); return; } - db = catalog.getDb(info.dbName).orElse(null); + db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null); if (db == null) { analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED, String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis()); return; } - tbl = db.getTable(info.tblName).orElse(null); + tbl = (TableIf) db.getTable(info.tblName).orElse(null); if (tbl == null) { analysisJobScheduler.updateJobStatus( info.jobId, JobState.FAILED, @@ -151,13 +152,13 @@ public class AnalysisJob { List partitionAnalysisSQLs = new ArrayList<>(); try { tbl.readLock(); - Set partNames = tbl.getPartitionNames(); + Set partNames = ((Table) tbl).getPartitionNames(); for (String partName : partNames) { - Partition part = tbl.getPartition(partName); + Partition part = ((Table) tbl).getPartition(partName); if (part == null) { continue; } - params.put("partId", String.valueOf(tbl.getPartition(partName).getId())); + params.put("partId", String.valueOf(((Table) tbl).getPartition(partName).getId())); params.put("partName", String.valueOf(partName)); StringSubstitutor stringSubstitutor = new StringSubstitutor(params); partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java index 734aa259e8..a4a916471f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java @@ -17,10 +17,15 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; import org.apache.doris.statistics.AnalysisJobInfo.JobState; import org.apache.doris.statistics.AnalysisJobInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; +import com.google.common.base.Preconditions; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; import org.apache.log4j.LogManager; @@ -80,7 +85,13 @@ public class AnalysisJobScheduler { } public synchronized void schedule(AnalysisJobInfo analysisJobInfo) { - AnalysisJob analysisJob = new AnalysisJob(this, analysisJobInfo); + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName); + Preconditions.checkArgument(catalog != null); + DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName); + Preconditions.checkArgument(db != null); + TableIf table = db.getTableNullable(analysisJobInfo.tblName); + Preconditions.checkArgument(table != null); + AnalysisJob analysisJob = table.createAnalysisJob(this, analysisJobInfo); addToManualJobQueue(analysisJob); if (analysisJobInfo.jobType.equals(JobType.MANUAL)) { return; diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java new file mode 100644 index 0000000000..c92d92dac9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.external.HMSExternalTable; +import org.apache.doris.common.Config; + +import org.apache.commons.lang.NotImplementedException; + +public class HMSAnalysisJob extends AnalysisJob { + + protected HMSExternalTable table; + + public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { + super(analysisJobScheduler, info); + table = (HMSExternalTable) tbl; + } + + /** + * Collect the column level stats for external table through metadata. + */ + protected void getColumnStatsByMeta() throws Exception { + throw new NotImplementedException(); + } + + /** + * Collect the stats for external table through sql. + * @return ColumnStatistics + */ + protected void getColumnStatsBySql() { + throw new NotImplementedException(); + } + + @Override + public void execute() throws Exception { + if (Config.collect_external_table_stats_by_sql) { + getColumnStatsBySql(); + } else { + getColumnStatsByMeta(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java new file mode 100644 index 0000000000..d098c22026 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.datasource.HMSExternalCatalog; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.base.Preconditions; +import org.apache.commons.text.StringSubstitutor; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; +import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; +import org.apache.hadoop.hive.metastore.api.DateColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData; +import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData; +import org.apache.hadoop.hive.metastore.api.LongColumnStatsData; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class HiveAnalysisJob extends HMSAnalysisJob { + private static final Logger LOG = LogManager.getLogger(HiveAnalysisJob.class); + + public static final String TOTAL_SIZE = "totalSize"; + public static final String NUM_ROWS = "numRows"; + public static final String NUM_FILES = "numFiles"; + public static final String TIMESTAMP = "transient_lastDdlTime"; + + public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { + super(analysisJobScheduler, info); + } + + private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', '${partId}', " + + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', NULL, " + + "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')"; + + @Override + protected void getColumnStatsByMeta() throws Exception { + List columns = new ArrayList<>(); + columns.add(col.getName()); + Map params = new HashMap<>(); + params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(tbl.getId())); + params.put("colId", String.valueOf(col.getName())); + + // Get table level information. + Map parameters = table.getRemoteTable().getParameters(); + // Collect table level row count, null number and timestamp. + setParameterData(parameters, params); + params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName())); + List tableStats = table.getHiveTableColumnStats(columns); + // Collect table level ndv, nulls, min and max. tableStats contains at most 1 item; + for (ColumnStatisticsObj tableStat : tableStats) { + if (!tableStat.isSetStatsData()) { + continue; + } + ColumnStatisticsData data = tableStat.getStatsData(); + getStatData(data, params); + } + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE); + ConnectContext connectContext = StatisticsUtil.buildConnectContext(); + this.stmtExecutor = new StmtExecutor(connectContext, sql); + this.stmtExecutor.execute(); + + // Get partition level information. + List partitions = ((HMSExternalCatalog) + catalog).getClient().listPartitionNames(db.getFullName(), table.getName()); + Map> columnStats = table.getHivePartitionColumnStats(partitions, columns); + List partitionAnalysisSQLs = new ArrayList<>(); + for (Map.Entry> entry : columnStats.entrySet()) { + String partName = entry.getKey(); + List partitionValues = new ArrayList<>(); + for (String p : partName.split("/")) { + partitionValues.add(p.split("=")[1]); + } + Partition partition = table.getPartition(partitionValues); + parameters = partition.getParameters(); + // Collect row count, null number and timestamp. + setParameterData(parameters, params); + params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName()) + "-" + partName); + params.put("partId", partName); + List value = entry.getValue(); + Preconditions.checkState(value.size() == 1); + ColumnStatisticsObj stat = value.get(0); + if (!stat.isSetStatsData()) { + continue; + } + // Collect ndv, nulls, min and max for different data type. + ColumnStatisticsData data = stat.getStatsData(); + getStatData(data, params); + stringSubstitutor = new StringSubstitutor(params); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + } + // Update partition level stats for this column. + for (String partitionSql : partitionAnalysisSQLs) { + connectContext = StatisticsUtil.buildConnectContext(); + this.stmtExecutor = new StmtExecutor(connectContext, partitionSql); + this.stmtExecutor.execute(); + } + } + + private void getStatData(ColumnStatisticsData data, Map params) { + long ndv = 0; + long nulls = 0; + String min; + String max; + // Collect ndv, nulls, min and max for different data type. + if (data.isSetLongStats()) { + LongColumnStatsData longStats = data.getLongStats(); + ndv = longStats.getNumDVs(); + nulls = longStats.getNumNulls(); + min = String.valueOf(longStats.getLowValue()); + max = String.valueOf(longStats.getHighValue()); + } else if (data.isSetStringStats()) { + StringColumnStatsData stringStats = data.getStringStats(); + ndv = stringStats.getNumDVs(); + nulls = stringStats.getNumNulls(); + min = "No value"; + max = String.valueOf(stringStats.getMaxColLen()); + } else if (data.isSetDecimalStats()) { + // TODO: Need a more accurate way to collect decimal values. + DecimalColumnStatsData decimalStats = data.getDecimalStats(); + ndv = decimalStats.getNumDVs(); + nulls = decimalStats.getNumNulls(); + min = decimalStats.getLowValue().toString(); + max = decimalStats.getHighValue().toString(); + } else if (data.isSetDoubleStats()) { + DoubleColumnStatsData doubleStats = data.getDoubleStats(); + ndv = doubleStats.getNumDVs(); + nulls = doubleStats.getNumNulls(); + min = String.valueOf(doubleStats.getLowValue()); + max = String.valueOf(doubleStats.getHighValue()); + } else if (data.isSetDateStats()) { + // TODO: Need a more accurate way to collect date values. + DateColumnStatsData dateStats = data.getDateStats(); + ndv = dateStats.getNumDVs(); + nulls = dateStats.getNumNulls(); + min = dateStats.getLowValue().toString(); + max = dateStats.getHighValue().toString(); + } else { + throw new RuntimeException("Not supported data type."); + } + params.put("ndv", String.valueOf(ndv)); + params.put("nulls", String.valueOf(nulls)); + params.put("min", min); + params.put("max", max); + } + + private void setParameterData(Map parameters, Map params) { + long numRows = 0; + long timestamp = 0; + long dataSize = 0; + if (parameters.containsKey(NUM_ROWS)) { + numRows = Long.parseLong(parameters.get(NUM_ROWS)); + } + if (parameters.containsKey(TIMESTAMP)) { + timestamp = Long.parseLong(parameters.get(TIMESTAMP)); + } + if (parameters.containsKey(TOTAL_SIZE)) { + dataSize = Long.parseLong(parameters.get(TOTAL_SIZE)); + } + params.put("dataSize", String.valueOf(dataSize)); + params.put("numRows", String.valueOf(numRows)); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + params.put("update_time", sdf.format(new Date(timestamp * 1000))); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java new file mode 100644 index 0000000000..9ed17f7230 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.commons.text.StringSubstitutor; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; + +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +public class IcebergAnalysisJob extends HMSAnalysisJob { + + private long numRows = 0; + private long dataSize = 0; + private long numNulls = 0; + + public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { + super(analysisJobScheduler, info); + } + + private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', NULL, " + + "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, '${update_time}')"; + + + @Override + protected void getColumnStatsByMeta() throws Exception { + Table icebergTable = getIcebergTable(); + TableScan tableScan = icebergTable.newScan().includeColumnStats(); + for (FileScanTask task : tableScan.planFiles()) { + processDataFile(task.file(), task.spec()); + } + updateStats(); + } + + private Table getIcebergTable() { + org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog(); + Configuration conf = new HdfsConfiguration(); + for (Map.Entry entry : table.getCatalog().getCatalogProperty().getProperties().entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + Map s3Properties = table.getS3Properties(); + for (Map.Entry entry : s3Properties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + hiveCatalog.setConf(conf); + Map catalogProperties = new HashMap<>(); + catalogProperties.put("hive.metastore.uris", table.getMetastoreUri()); + catalogProperties.put("uri", table.getMetastoreUri()); + hiveCatalog.initialize("hive", catalogProperties); + return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName())); + } + + private void processDataFile(DataFile dataFile, PartitionSpec partitionSpec) { + int colId = -1; + for (Types.NestedField column : partitionSpec.schema().columns()) { + if (column.name().equals(col.getName())) { + colId = column.fieldId(); + break; + } + } + if (colId == -1) { + throw new RuntimeException(String.format("Column %s not exist.", col.getName())); + } + dataSize += dataFile.columnSizes().get(colId); + numRows += dataFile.recordCount(); + numNulls += dataFile.nullValueCounts().get(colId); + } + + private void updateStats() throws Exception { + Map params = new HashMap<>(); + params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName())); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(tbl.getId())); + params.put("colId", String.valueOf(col.getName())); + params.put("numRows", String.valueOf(numRows)); + params.put("nulls", String.valueOf(numNulls)); + params.put("dataSize", String.valueOf(dataSize)); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + params.put("update_time", sdf.format(new Date())); + + // Update table level stats info of this column. + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE); + ConnectContext connectContext = StatisticsUtil.buildConnectContext(); + this.stmtExecutor = new StmtExecutor(connectContext, sql); + this.stmtExecutor.execute(); + } +}