[feature-wip](multi-catalog) Collect external table statistics (#14160)

Collect HMS external table statistic information through external metadata.
Insert the result into __internal_schema.column_statistics using insert into SQL.
This commit is contained in:
Jibing-Li
2022-11-17 20:41:09 +08:00
committed by GitHub
parent 44ee4386f7
commit ccf4db394c
13 changed files with 503 additions and 20 deletions

View File

@ -19,10 +19,12 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
@ -31,6 +33,7 @@ import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
@ -202,17 +205,13 @@ public class AnalyzeStmt extends DdlStmt {
if (optTableName != null) {
optTableName.analyze(analyzer);
// disallow external catalog
Util.prohibitExternalCatalog(optTableName.getCtl(),
this.getClass().getSimpleName());
String catalogName = optTableName.getCtl();
String dbName = optTableName.getDb();
String tblName = optTableName.getTbl();
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
Table table = db.getTableOrAnalysisException(tblName);
CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName);
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName);
// external table is not supported
checkAnalyzeType(table);
checkAnalyzePriv(dbName, tblName);
if (optColumnNames != null && !optColumnNames.isEmpty()) {

View File

@ -46,6 +46,9 @@ import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCompressionType;
@ -976,6 +979,11 @@ public class OlapTable extends Table {
return tTableDescriptor;
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
return new AnalysisJob(scheduler, info);
}
@Override
public long getRowCount() {
long rowCount = 0;

View File

@ -26,6 +26,9 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.base.Preconditions;
@ -503,4 +506,9 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
public Set<String> getPartitionNames() {
return Collections.EMPTY_SET;
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
throw new NotImplementedException();
}
}

View File

@ -20,6 +20,9 @@ package org.apache.doris.catalog;
import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.thrift.TTableDescriptor;
import java.util.Collections;
@ -108,6 +111,8 @@ public interface TableIf {
TTableDescriptor toThrift();
AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info);
/**
* Doris table type.
*/

View File

@ -29,6 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.gson.annotations.SerializedName;
@ -297,6 +300,11 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
return null;
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
throw new NotImplementedException();
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);

View File

@ -21,13 +21,21 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.HiveAnalysisJob;
import org.apache.doris.statistics.IcebergAnalysisJob;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -266,6 +274,19 @@ public class HMSExternalTable extends ExternalTable {
return tTableDescriptor;
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
makeSureInitialized();
switch (dlaType) {
case HIVE:
return new HiveAnalysisJob(scheduler, info);
case ICEBERG:
return new IcebergAnalysisJob(scheduler, info);
default:
throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported.");
}
}
public String getMetastoreUri() {
return ((HMSExternalCatalog) catalog).getHiveMetastoreUris();
}
@ -277,5 +298,21 @@ public class HMSExternalTable extends ExternalTable {
public Map<String, String> getS3Properties() {
return catalog.getCatalogProperty().getS3Properties();
}
public List<ColumnStatisticsObj> getHiveTableColumnStats(List<String> columns) {
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
return client.getTableColumnStatistics(dbName, name, columns);
}
public Map<String, List<ColumnStatisticsObj>> getHivePartitionColumnStats(
List<String> partNames, List<String> columns) {
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
return client.getPartitionColumnStatistics(dbName, name, partNames, columns);
}
public Partition getPartition(List<String> partitionValues) {
PooledHiveMetaStoreClient client = ((HMSExternalCatalog) catalog).getClient();
return client.getPartition(dbName, name, partitionValues);
}
}

View File

@ -1909,5 +1909,12 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true, masterOnly = false)
public static boolean use_fuzzy_session_variable = false;
/**
* Collect external table statistic info by running sql when set to true.
* Otherwise, use external catalog metadata.
*/
@ConfField(mutable = true)
public static boolean collect_external_table_stats_by_sql = false;
}

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaHookLoader;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
@ -37,6 +38,7 @@ import org.apache.logging.log4j.Logger;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
/**
@ -126,6 +128,23 @@ public class PooledHiveMetaStoreClient {
}
}
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
try (CachedClient client = getClient()) {
return client.client.getTableColumnStatistics(dbName, tblName, columns);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
String dbName, String tblName, List<String> partNames, List<String> columns) {
try (CachedClient client = getClient()) {
return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private class CachedClient implements AutoCloseable {
private final IMetaStoreClient client;

View File

@ -21,6 +21,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.ConnectContext;
@ -40,17 +41,17 @@ public class AnalysisJob {
private final AnalysisJobScheduler analysisJobScheduler;
private final AnalysisJobInfo info;
protected final AnalysisJobInfo info;
private CatalogIf<DatabaseIf> catalog;
protected CatalogIf catalog;
private DatabaseIf<TableIf> db;
protected DatabaseIf db;
private TableIf tbl;
protected TableIf tbl;
private Column col;
protected Column col;
private StmtExecutor stmtExecutor;
protected StmtExecutor stmtExecutor;
public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
this.analysisJobScheduler = analysisJobScheduler;
@ -65,13 +66,13 @@ public class AnalysisJob {
String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis());
return;
}
db = catalog.getDb(info.dbName).orElse(null);
db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
if (db == null) {
analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis());
return;
}
tbl = db.getTable(info.tblName).orElse(null);
tbl = (TableIf) db.getTable(info.tblName).orElse(null);
if (tbl == null) {
analysisJobScheduler.updateJobStatus(
info.jobId, JobState.FAILED,
@ -151,13 +152,13 @@ public class AnalysisJob {
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = tbl.getPartitionNames();
Set<String> partNames = ((Table) tbl).getPartitionNames();
for (String partName : partNames) {
Partition part = tbl.getPartition(partName);
Partition part = ((Table) tbl).getPartition(partName);
if (part == null) {
continue;
}
params.put("partId", String.valueOf(tbl.getPartition(partName).getId()));
params.put("partId", String.valueOf(((Table) tbl).getPartition(partName).getId()));
params.put("partName", String.valueOf(partName));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));

View File

@ -17,10 +17,15 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.log4j.LogManager;
@ -80,7 +85,13 @@ public class AnalysisJobScheduler {
}
public synchronized void schedule(AnalysisJobInfo analysisJobInfo) {
AnalysisJob analysisJob = new AnalysisJob(this, analysisJobInfo);
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
Preconditions.checkArgument(catalog != null);
DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
Preconditions.checkArgument(db != null);
TableIf table = db.getTableNullable(analysisJobInfo.tblName);
Preconditions.checkArgument(table != null);
AnalysisJob analysisJob = table.createAnalysisJob(this, analysisJobInfo);
addToManualJobQueue(analysisJob);
if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
return;

View File

@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
import org.apache.commons.lang.NotImplementedException;
public class HMSAnalysisJob extends AnalysisJob {
protected HMSExternalTable table;
public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
table = (HMSExternalTable) tbl;
}
/**
* Collect the column level stats for external table through metadata.
*/
protected void getColumnStatsByMeta() throws Exception {
throw new NotImplementedException();
}
/**
* Collect the stats for external table through sql.
* @return ColumnStatistics
*/
protected void getColumnStatsBySql() {
throw new NotImplementedException();
}
@Override
public void execute() throws Exception {
if (Config.collect_external_table_stats_by_sql) {
getColumnStatsBySql();
} else {
getColumnStatsByMeta();
}
}
}

View File

@ -0,0 +1,201 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.base.Preconditions;
import org.apache.commons.text.StringSubstitutor;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.DateColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DecimalColumnStatsData;
import org.apache.hadoop.hive.metastore.api.DoubleColumnStatsData;
import org.apache.hadoop.hive.metastore.api.LongColumnStatsData;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StringColumnStatsData;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HiveAnalysisJob extends HMSAnalysisJob {
private static final Logger LOG = LogManager.getLogger(HiveAnalysisJob.class);
public static final String TOTAL_SIZE = "totalSize";
public static final String NUM_ROWS = "numRows";
public static final String NUM_FILES = "numFiles";
public static final String TIMESTAMP = "transient_lastDdlTime";
public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
}
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', '${partId}', "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', NULL, "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
@Override
protected void getColumnStatsByMeta() throws Exception {
List<String> columns = new ArrayList<>();
columns.add(col.getName());
Map<String, String> params = new HashMap<>();
params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("colId", String.valueOf(col.getName()));
// Get table level information.
Map<String, String> parameters = table.getRemoteTable().getParameters();
// Collect table level row count, null number and timestamp.
setParameterData(parameters, params);
params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName()));
List<ColumnStatisticsObj> tableStats = table.getHiveTableColumnStats(columns);
// Collect table level ndv, nulls, min and max. tableStats contains at most 1 item;
for (ColumnStatisticsObj tableStat : tableStats) {
if (!tableStat.isSetStatsData()) {
continue;
}
ColumnStatisticsData data = tableStat.getStatsData();
getStatData(data, params);
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
this.stmtExecutor = new StmtExecutor(connectContext, sql);
this.stmtExecutor.execute();
// Get partition level information.
List<String> partitions = ((HMSExternalCatalog)
catalog).getClient().listPartitionNames(db.getFullName(), table.getName());
Map<String, List<ColumnStatisticsObj>> columnStats = table.getHivePartitionColumnStats(partitions, columns);
List<String> partitionAnalysisSQLs = new ArrayList<>();
for (Map.Entry<String, List<ColumnStatisticsObj>> entry : columnStats.entrySet()) {
String partName = entry.getKey();
List<String> partitionValues = new ArrayList<>();
for (String p : partName.split("/")) {
partitionValues.add(p.split("=")[1]);
}
Partition partition = table.getPartition(partitionValues);
parameters = partition.getParameters();
// Collect row count, null number and timestamp.
setParameterData(parameters, params);
params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName()) + "-" + partName);
params.put("partId", partName);
List<ColumnStatisticsObj> value = entry.getValue();
Preconditions.checkState(value.size() == 1);
ColumnStatisticsObj stat = value.get(0);
if (!stat.isSetStatsData()) {
continue;
}
// Collect ndv, nulls, min and max for different data type.
ColumnStatisticsData data = stat.getStatsData();
getStatData(data, params);
stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
// Update partition level stats for this column.
for (String partitionSql : partitionAnalysisSQLs) {
connectContext = StatisticsUtil.buildConnectContext();
this.stmtExecutor = new StmtExecutor(connectContext, partitionSql);
this.stmtExecutor.execute();
}
}
private void getStatData(ColumnStatisticsData data, Map<String, String> params) {
long ndv = 0;
long nulls = 0;
String min;
String max;
// Collect ndv, nulls, min and max for different data type.
if (data.isSetLongStats()) {
LongColumnStatsData longStats = data.getLongStats();
ndv = longStats.getNumDVs();
nulls = longStats.getNumNulls();
min = String.valueOf(longStats.getLowValue());
max = String.valueOf(longStats.getHighValue());
} else if (data.isSetStringStats()) {
StringColumnStatsData stringStats = data.getStringStats();
ndv = stringStats.getNumDVs();
nulls = stringStats.getNumNulls();
min = "No value";
max = String.valueOf(stringStats.getMaxColLen());
} else if (data.isSetDecimalStats()) {
// TODO: Need a more accurate way to collect decimal values.
DecimalColumnStatsData decimalStats = data.getDecimalStats();
ndv = decimalStats.getNumDVs();
nulls = decimalStats.getNumNulls();
min = decimalStats.getLowValue().toString();
max = decimalStats.getHighValue().toString();
} else if (data.isSetDoubleStats()) {
DoubleColumnStatsData doubleStats = data.getDoubleStats();
ndv = doubleStats.getNumDVs();
nulls = doubleStats.getNumNulls();
min = String.valueOf(doubleStats.getLowValue());
max = String.valueOf(doubleStats.getHighValue());
} else if (data.isSetDateStats()) {
// TODO: Need a more accurate way to collect date values.
DateColumnStatsData dateStats = data.getDateStats();
ndv = dateStats.getNumDVs();
nulls = dateStats.getNumNulls();
min = dateStats.getLowValue().toString();
max = dateStats.getHighValue().toString();
} else {
throw new RuntimeException("Not supported data type.");
}
params.put("ndv", String.valueOf(ndv));
params.put("nulls", String.valueOf(nulls));
params.put("min", min);
params.put("max", max);
}
private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
long numRows = 0;
long timestamp = 0;
long dataSize = 0;
if (parameters.containsKey(NUM_ROWS)) {
numRows = Long.parseLong(parameters.get(NUM_ROWS));
}
if (parameters.containsKey(TIMESTAMP)) {
timestamp = Long.parseLong(parameters.get(TIMESTAMP));
}
if (parameters.containsKey(TOTAL_SIZE)) {
dataSize = Long.parseLong(parameters.get(TOTAL_SIZE));
}
params.put("dataSize", String.valueOf(dataSize));
params.put("numRows", String.valueOf(numRows));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
params.put("update_time", sdf.format(new Date(timestamp * 1000)));
}
}

View File

@ -0,0 +1,122 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.text.StringSubstitutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableScan;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class IcebergAnalysisJob extends HMSAnalysisJob {
private long numRows = 0;
private long dataSize = 0;
private long numNulls = 0;
public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
}
private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '${colId}', NULL, "
+ "${numRows}, 0, ${nulls}, '0', '0', ${dataSize}, '${update_time}')";
@Override
protected void getColumnStatsByMeta() throws Exception {
Table icebergTable = getIcebergTable();
TableScan tableScan = icebergTable.newScan().includeColumnStats();
for (FileScanTask task : tableScan.planFiles()) {
processDataFile(task.file(), task.spec());
}
updateStats();
}
private Table getIcebergTable() {
org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
Configuration conf = new HdfsConfiguration();
for (Map.Entry<String, String> entry : table.getCatalog().getCatalogProperty().getProperties().entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
Map<String, String> s3Properties = table.getS3Properties();
for (Map.Entry<String, String> entry : s3Properties.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
hiveCatalog.setConf(conf);
Map<String, String> catalogProperties = new HashMap<>();
catalogProperties.put("hive.metastore.uris", table.getMetastoreUri());
catalogProperties.put("uri", table.getMetastoreUri());
hiveCatalog.initialize("hive", catalogProperties);
return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
}
private void processDataFile(DataFile dataFile, PartitionSpec partitionSpec) {
int colId = -1;
for (Types.NestedField column : partitionSpec.schema().columns()) {
if (column.name().equals(col.getName())) {
colId = column.fieldId();
break;
}
}
if (colId == -1) {
throw new RuntimeException(String.format("Column %s not exist.", col.getName()));
}
dataSize += dataFile.columnSizes().get(colId);
numRows += dataFile.recordCount();
numNulls += dataFile.nullValueCounts().get(colId);
}
private void updateStats() throws Exception {
Map<String, String> params = new HashMap<>();
params.put("internalDB", StatisticConstants.STATISTIC_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("id", String.valueOf(tbl.getId()) + "-" + String.valueOf(col.getName()));
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("colId", String.valueOf(col.getName()));
params.put("numRows", String.valueOf(numRows));
params.put("nulls", String.valueOf(numNulls));
params.put("dataSize", String.valueOf(dataSize));
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
params.put("update_time", sdf.format(new Date()));
// Update table level stats info of this column.
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(INSERT_TABLE_SQL_TEMPLATE);
ConnectContext connectContext = StatisticsUtil.buildConnectContext();
this.stmtExecutor = new StmtExecutor(connectContext, sql);
this.stmtExecutor.execute();
}
}