[Fix](statistics)Fix hive table statistic bug (#19365)

Fix hive table statistic bug. Collect table/partition level statistics.
This commit is contained in:
Jibing-Li
2023-05-11 07:48:58 +08:00
committed by GitHub
parent 22f95fca97
commit 2d1f597413
9 changed files with 134 additions and 31 deletions

View File

@ -18,12 +18,15 @@
package org.apache.doris.catalog.external;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.HiveAnalysisTask;
import org.apache.doris.statistics.IcebergAnalysisTask;
import org.apache.doris.thrift.THiveTable;
@ -322,6 +325,18 @@ public class HMSExternalTable extends ExternalTable {
return columns;
}
@Override
public long estimatedRowCount() {
ColumnStatistic cache = Config.enable_stats
? Env.getCurrentEnv().getStatisticsCache().getColumnStatistics(id, "")
: ColumnStatistic.UNKNOWN;
if (cache == ColumnStatistic.UNKNOWN) {
return 1;
} else {
return (long) cache.count;
}
}
private List<Column> getIcebergSchema(List<FieldSchema> hmsSchema) {
Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(this);
Schema schema = icebergTable.schema();

View File

@ -124,6 +124,7 @@ public class AnalysisManager {
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
createTaskForMVIdx(jobInfo, analysisTaskInfos, isSync);
createTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
ConnectContext ctx = ConnectContext.get();
if (!isSync || ctx.getSessionVariable().enableSaveStatisticsSyncJob) {
@ -425,6 +426,31 @@ public class AnalysisManager {
}
}
private void createTaskForExternalTable(AnalysisTaskInfo jobInfo,
Map<Long, BaseAnalysisTask> analysisTasks,
boolean isSync) throws DdlException {
TableIf table;
try {
table = StatisticsUtil.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
} catch (Throwable e) {
LOG.warn(e.getMessage());
return;
}
if (jobInfo.analysisType == AnalysisType.HISTOGRAM || table.getType() != TableType.HMS_EXTERNAL_TABLE) {
return;
}
AnalysisTaskInfoBuilder colTaskInfoBuilder = new AnalysisTaskInfoBuilder(jobInfo);
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = colTaskInfoBuilder.setIndexId(-1L)
.setTaskId(taskId).setExternalTableLevelTask(true).build();
analysisTasks.put(taskId, createTask(analysisTaskInfo));
try {
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
}
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {
if (analysisJobIdToTaskMap.get(info.jobId) == null) {
return;

View File

@ -104,11 +104,15 @@ public class AnalysisTaskInfo {
public String message;
// True means this task is a table level task for external table.
// This kind of task is mainly to collect the number of rows of a table.
public boolean externalTableLevelTask;
public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
Map<String, Set<String>> colToPartitions, String colName, Long indexId, JobType jobType,
AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
int samplePercent, int sampleRows, int maxBucketNum, long periodTimeInMs, String message,
long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType) {
long lastExecTimeInMs, AnalysisState state, ScheduleType scheduleType, boolean isExternalTableLevelTask) {
this.jobId = jobId;
this.taskId = taskId;
this.catalogName = catalogName;
@ -129,6 +133,7 @@ public class AnalysisTaskInfo {
this.lastExecTimeInMs = lastExecTimeInMs;
this.state = state;
this.scheduleType = scheduleType;
this.externalTableLevelTask = isExternalTableLevelTask;
}
@Override

View File

@ -47,6 +47,7 @@ public class AnalysisTaskInfoBuilder {
private AnalysisState state;
private ScheduleType scheduleType;
private String message;
private boolean externalTableLevelTask;
public AnalysisTaskInfoBuilder() {
}
@ -174,10 +175,16 @@ public class AnalysisTaskInfoBuilder {
return this;
}
public AnalysisTaskInfoBuilder setExternalTableLevelTask(boolean isTableLevel) {
this.externalTableLevelTask = isTableLevel;
return this;
}
public AnalysisTaskInfo build() {
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colToPartitions,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType);
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, state, scheduleType,
externalTableLevelTask);
}
public AnalysisTaskInfoBuilder copy() {
@ -201,6 +208,7 @@ public class AnalysisTaskInfoBuilder {
.setMessage(message)
.setLastExecTimeInMs(lastExecTimeInMs)
.setState(state)
.setScheduleType(scheduleType);
.setScheduleType(scheduleType)
.setExternalTableLevelTask(false);
}
}

View File

@ -142,6 +142,10 @@ public abstract class BaseAnalysisTask {
info, AnalysisState.FAILED,
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
}
// External Table level task doesn't contain a column. Don't need to do the column related analyze.
if (info.externalTableLevelTask) {
return;
}
if (info.analysisType != null && (info.analysisType.equals(AnalysisType.COLUMN)
|| info.analysisType.equals(AnalysisType.HISTOGRAM))) {
col = tbl.getColumn(info.colName);

View File

@ -138,10 +138,9 @@ public class ColumnStatistic {
String colName = resultRow.getColumnValue("col_id");
Column col = StatisticsUtil.findColumn(catalogId, dbID, tblId, idxId, colName);
if (col == null) {
LOG.warn("Failed to deserialize column statistics, ctlId: {} dbId: {}"
+ "tblId: {} column: {} not exists",
catalogId, dbID, tblId, colName);
return ColumnStatistic.UNKNOWN;
// Col is null indicates this information is external table level info,
// which doesn't have a column.
return columnStatisticBuilder.build();
}
String min = resultRow.getColumnValue("min");
String max = resultRow.getColumnValue("max");

View File

@ -34,7 +34,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
/**
* Collect the column level stats for external table through metadata.
*/
protected void getColumnStatsByMeta() throws Exception {
protected void getStatsByMeta() throws Exception {
throw new NotImplementedException("Code is not implemented");
}
@ -42,16 +42,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
* Collect the stats for external table through sql.
* @return ColumnStatistics
*/
protected void getColumnStatsBySql() {
protected void getStatsBySql() {
throw new NotImplementedException("getColumnStatsBySql is not implemented");
}
@Override
public void execute() throws Exception {
if (Config.collect_external_table_stats_by_sql) {
getColumnStatsBySql();
getStatsBySql();
} else {
getColumnStatsByMeta();
getStatsByMeta();
}
}
}

View File

@ -57,21 +57,63 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
public static final String TIMESTAMP = "transient_lastDdlTime";
public static final String DELIMITER = "-";
private final boolean isTableLevelTask;
public HiveAnalysisTask(AnalysisTaskInfo info) {
super(info);
isTableLevelTask = info.externalTableLevelTask;
}
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
private static final String ANALYZE_TABLE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
private static final String ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', '${partId}', "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
private static final String ANALYZE_TABLE_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '${colId}', NULL, "
+ "${numRows}, ${ndv}, ${nulls}, '${min}', '${max}', ${dataSize}, '${update_time}')";
+ " values ('${id}','${catalogId}', '${dbId}', '${tblId}', '-1', '', NULL, "
+ "${numRows}, 0, 0, '', '', ${dataSize}, '${update_time}')";
@Override
protected void getStatsByMeta() throws Exception {
if (isTableLevelTask) {
getTableStatsByMeta();
} else {
getColumnStatsByMeta();
}
}
protected void getTableStatsByMeta() throws Exception {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("colId", "");
// Get table level information.
Map<String, String> parameters = table.getRemoteTable().getParameters();
// Collect table level row count, null number and timestamp.
setParameterData(parameters, params);
if (parameters.containsKey(TOTAL_SIZE)) {
params.put("dataSize", parameters.get(TOTAL_SIZE));
}
params.put("id", genColumnStatId(tbl.getId(), -1, "", null));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
this.stmtExecutor.execute();
}
}
protected void getColumnStatsByMeta() throws Exception {
List<String> columns = new ArrayList<>();
columns.add(col.getName());
@ -89,16 +131,17 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
setParameterData(parameters, params);
params.put("id", genColumnStatId(tbl.getId(), -1, col.getName(), null));
List<ColumnStatisticsObj> tableStats = table.getHiveTableColumnStats(columns);
long rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0;
// Collect table level ndv, nulls, min and max. tableStats contains at most 1 item;
for (ColumnStatisticsObj tableStat : tableStats) {
if (!tableStat.isSetStatsData()) {
continue;
}
ColumnStatisticsData data = tableStat.getStatsData();
getStatData(data, params);
getStatData(data, params, rowCount);
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_TABLE_SQL_TEMPLATE);
String sql = stringSubstitutor.replace(ANALYZE_TABLE_COLUMN_SQL_TEMPLATE);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
@ -128,11 +171,12 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
if (!stat.isSetStatsData()) {
continue;
}
rowCount = parameters.containsKey(NUM_ROWS) ? Long.parseLong(parameters.get(NUM_ROWS)) : 0;
// Collect ndv, nulls, min and max for different data type.
ColumnStatisticsData data = stat.getStatsData();
getStatData(data, params);
getStatData(data, params, rowCount);
stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_SQL_TEMPLATE));
}
// Update partition level stats for this column.
for (String partitionSql : partitionAnalysisSQLs) {
@ -145,11 +189,15 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(tbl.getId(), -1, col.getName());
}
private void getStatData(ColumnStatisticsData data, Map<String, String> params) {
private void getStatData(ColumnStatisticsData data, Map<String, String> params, long rowCount) {
long ndv = 0;
long nulls = 0;
String min = "";
String max = "";
long colSize = 0;
if (!data.isSetStringStats()) {
colSize = rowCount * col.getType().getSlotSize();
}
// Collect ndv, nulls, min and max for different data type.
if (data.isSetLongStats()) {
LongColumnStatsData longStats = data.getLongStats();
@ -161,6 +209,8 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
StringColumnStatsData stringStats = data.getStringStats();
ndv = stringStats.getNumDVs();
nulls = stringStats.getNumNulls();
double avgColLen = stringStats.getAvgColLen();
colSize = Math.round(avgColLen * rowCount);
} else if (data.isSetDecimalStats()) {
DecimalColumnStatsData decimalStats = data.getDecimalStats();
ndv = decimalStats.getNumDVs();
@ -211,25 +261,21 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
params.put("nulls", String.valueOf(nulls));
params.put("min", min);
params.put("max", max);
params.put("dataSize", String.valueOf(colSize));
}
private void setParameterData(Map<String, String> parameters, Map<String, String> params) {
long numRows = 0;
long timestamp = 0;
long dataSize = 0;
String numRows = "";
String timestamp = "";
if (parameters.containsKey(NUM_ROWS)) {
numRows = Long.parseLong(parameters.get(NUM_ROWS));
numRows = parameters.get(NUM_ROWS);
}
if (parameters.containsKey(TIMESTAMP)) {
timestamp = Long.parseLong(parameters.get(TIMESTAMP));
timestamp = parameters.get(TIMESTAMP);
}
if (parameters.containsKey(TOTAL_SIZE)) {
dataSize = Long.parseLong(parameters.get(TOTAL_SIZE));
}
params.put("dataSize", String.valueOf(dataSize));
params.put("numRows", String.valueOf(numRows));
params.put("numRows", numRows);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
params.put("update_time", sdf.format(new Date(timestamp * 1000)));
params.put("update_time", sdf.format(new Date(Long.parseLong(timestamp) * 1000)));
}
private String genColumnStatId(long tableId, long indexId, String columnName, String partitionName) {

View File

@ -56,7 +56,7 @@ public class IcebergAnalysisTask extends HMSAnalysisTask {
@Override
protected void getColumnStatsByMeta() throws Exception {
protected void getStatsByMeta() throws Exception {
Table icebergTable = getIcebergTable();
TableScan tableScan = icebergTable.newScan().includeColumnStats();
for (FileScanTask task : tableScan.planFiles()) {