[refactor](stats) Refactor TableStatsMeta

1. Add a abstraction for column stats status which is required so furthur optimization and feature development
 2. Enable analyze test in p0 that disabled unexpectedly before
This commit is contained in:
AKIRA
2023-10-07 20:48:54 +09:00
committed by GitHub
parent 8953179c11
commit e5fe4e5b83
21 changed files with 234 additions and 86 deletions

View File

@ -32,6 +32,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.ColStatsMeta;
import org.apache.doris.statistics.ColumnStatistic;
import com.google.common.collect.ImmutableList;
@ -54,6 +55,10 @@ public class ShowColumnStatsStmt extends ShowStmt {
.add("avg_size_byte")
.add("min")
.add("max")
.add("method")
.add("type")
.add("trigger")
.add("query_times")
.add("updated_time")
.build();
@ -136,6 +141,7 @@ public class ShowColumnStatsStmt extends ShowStmt {
if (p.second.isUnKnown) {
return;
}
List<String> row = Lists.newArrayList();
row.add(p.first);
row.add(String.valueOf(p.second.count));
@ -145,6 +151,12 @@ public class ShowColumnStatsStmt extends ShowStmt {
row.add(String.valueOf(p.second.avgSizeByte));
row.add(String.valueOf(p.second.minExpr == null ? "N/A" : p.second.minExpr.toSql()));
row.add(String.valueOf(p.second.maxExpr == null ? "N/A" : p.second.maxExpr.toSql()));
ColStatsMeta colStatsMeta = Env.getCurrentEnv().getAnalysisManager().findColStatsMeta(table.getId(),
p.first);
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.analysisMethod));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.analysisType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.jobType));
row.add(String.valueOf(colStatsMeta == null ? "N/A" : colStatsMeta.queriedTimes));
row.add(String.valueOf(p.second.updatedTime));
result.add(row);
});

View File

@ -32,7 +32,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.ShowResultSetMetaData;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@ -49,8 +49,6 @@ public class ShowTableStatsStmt extends ShowStmt {
.add("updated_rows")
.add("query_times")
.add("row_count")
.add("method")
.add("type")
.add("updated_time")
.add("columns")
.add("trigger")
@ -132,7 +130,7 @@ public class ShowTableStatsStmt extends ShowStmt {
return table.getPartition(partitionName).getId();
}
public ShowResultSet constructResultSet(TableStats tableStatistic) {
public ShowResultSet constructResultSet(TableStatsMeta tableStatistic) {
if (tableStatistic == null) {
return new ShowResultSet(getMetaData(), new ArrayList<>());
}
@ -141,8 +139,6 @@ public class ShowTableStatsStmt extends ShowStmt {
row.add(String.valueOf(tableStatistic.updatedRows));
row.add(String.valueOf(tableStatistic.queriedTimes.get()));
row.add(String.valueOf(tableStatistic.rowCount));
row.add(tableStatistic.analysisMethod.toString());
row.add(tableStatistic.analysisType.toString());
row.add(new Date(tableStatistic.updatedTime).toString());
row.add(tableStatistic.analyzeColumns().toString());
row.add(tableStatistic.jobType.toString());

View File

@ -56,7 +56,7 @@ import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HistogramTask;
import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@ -1125,12 +1125,15 @@ public class OlapTable extends Table {
return new MVAnalysisTask(info);
}
@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
if (tblStats == null) {
return true;
}
long rowCount = getRowCount();
// TODO: Do we need to analyze an empty table?
if (rowCount == 0) {
return false;
}
if (!tblStats.analyzeColumns().containsAll(getBaseSchema()
.stream()
.map(Column::getName)
@ -1145,7 +1148,7 @@ public class OlapTable extends Table {
@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
TableIf table = this;
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
Set<String> allPartitions = table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {

View File

@ -30,7 +30,7 @@ import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.base.Preconditions;
@ -575,7 +575,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
public void analyze(String dbName) {}
@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
return true;
}

View File

@ -23,7 +23,7 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.collect.Lists;
@ -138,7 +138,7 @@ public interface TableIf {
Optional<ColumnStatistic> getColumnStatistic(String colName);
boolean needReAnalyzeTable(TableStats tblStats);
boolean needReAnalyzeTable(TableStatsMeta tblStats);
Map<String, Set<String>> findReAnalyzeNeededPartitions();

View File

@ -35,7 +35,7 @@ import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.collect.Sets;
@ -383,7 +383,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
}
@Override
public boolean needReAnalyzeTable(TableStats tblStats) {
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
// TODO: Find a way to decide if this external table need to be reanalyzed.
// For now, simply return true for all external tables.
return true;

View File

@ -32,7 +32,7 @@ import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.ColumnStatisticBuilder;
import org.apache.doris.statistics.HMSAnalysisTask;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
@ -436,7 +436,7 @@ public class HMSExternalTable extends ExternalTable {
@Override
public long estimatedRowCount() {
try {
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);

View File

@ -24,7 +24,7 @@ import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.logging.log4j.LogManager;
@ -111,7 +111,7 @@ public class JdbcExternalTable extends ExternalTable {
@Override
public long getRowCount() {
makeSureInitialized();
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(id);
if (tableStats != null) {
long rowCount = tableStats.rowCount;
LOG.debug("Estimated row count for db {} table {} is {}.", dbName, name, rowCount);

View File

@ -126,7 +126,7 @@ import org.apache.doris.resource.workloadgroup.WorkloadGroup;
import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -889,7 +889,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_UPDATE_TABLE_STATS: {
data = TableStats.read(in);
data = TableStatsMeta.read(in);
isRead = true;
break;
}

View File

@ -87,7 +87,7 @@ import org.apache.doris.scheduler.job.Job;
import org.apache.doris.scheduler.job.JobTask;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@ -1124,7 +1124,7 @@ public class EditLog {
break;
}
case OperationType.OP_UPDATE_TABLE_STATS: {
env.getAnalysisManager().replayUpdateTableStatsStatus((TableStats) journal.getData());
env.getAnalysisManager().replayUpdateTableStatsStatus((TableStatsMeta) journal.getData());
break;
}
case OperationType.OP_PERSIST_AUTO_JOB: {
@ -1981,7 +1981,7 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_AUTO_INCREMENT_ID, log);
}
public void logCreateTableStats(TableStats tableStats) {
public void logCreateTableStats(TableStatsMeta tableStats) {
logEdit(OperationType.OP_UPDATE_TABLE_STATS, tableStats);
}

View File

@ -199,7 +199,7 @@ import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.Histogram;
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.doris.statistics.TableStats;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.query.QueryStatsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Diagnoser;
@ -2454,7 +2454,7 @@ public class ShowExecutor {
private void handleShowTableStats() {
ShowTableStatsStmt showTableStatsStmt = (ShowTableStatsStmt) stmt;
TableIf tableIf = showTableStatsStmt.getTable();
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(tableIf.getId());
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(tableIf.getId());
/*
HMSExternalTable table will fetch row count from HMS
or estimate with file size and schema if it's not analyzed.

View File

@ -120,7 +120,7 @@ public class AnalysisManager extends Daemon implements Writable {
// Tracking and control sync analyze tasks, keep in mem only
private final ConcurrentMap<ConnectContext, SyncTaskCollection> ctxToSyncTask = new ConcurrentHashMap<>();
private final Map<Long, TableStats> idToTblStats = new ConcurrentHashMap<>();
private final Map<Long, TableStatsMeta> idToTblStats = new ConcurrentHashMap<>();
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);
@ -511,6 +511,7 @@ public class AnalysisManager extends Daemon implements Writable {
infoBuilder.setCatalogName(catalogName);
infoBuilder.setDbName(db);
infoBuilder.setTblName(tblName);
// TODO: Refactor later, DON'T MODIFY IT RIGHT NOW
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
for (String colName : columnNames) {
stringJoiner.add(colName);
@ -643,8 +644,14 @@ public class AnalysisManager extends Daemon implements Writable {
if (tbl instanceof ExternalTable) {
return;
}
// TODO: set updatedRows to 0, when loadedRows of transaction info is ready.
updateTableStatsStatus(new TableStats(tbl.getId(), tbl.estimatedRowCount(), jobInfo));
TableStatsMeta tableStats = findTableStatsStatus(tbl.getId());
if (tableStats == null) {
updateTableStatsStatus(new TableStatsMeta(tbl.getId(), tbl.estimatedRowCount(), jobInfo));
} else {
tableStats.updateByJob(jobInfo);
logCreateTableStats(tableStats);
}
}
public List<AnalysisInfo> showAnalysisJob(ShowAnalyzeStmt stmt) {
@ -726,7 +733,7 @@ public class AnalysisManager extends Daemon implements Writable {
Set<String> cols = dropStatsStmt.getColumnNames();
long tblId = dropStatsStmt.getTblId();
TableStats tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
TableStatsMeta tableStats = findTableStatsStatus(dropStatsStmt.getTblId());
if (tableStats == null) {
return;
}
@ -949,10 +956,10 @@ public class AnalysisManager extends Daemon implements Writable {
return false;
}
private static void readIdToTblStats(DataInput in, Map<Long, TableStats> map) throws IOException {
private static void readIdToTblStats(DataInput in, Map<Long, TableStatsMeta> map) throws IOException {
int size = in.readInt();
for (int i = 0; i < size; i++) {
TableStats tableStats = TableStats.read(in);
TableStatsMeta tableStats = TableStatsMeta.read(in);
map.put(tableStats.tblId, tableStats);
}
}
@ -980,7 +987,7 @@ public class AnalysisManager extends Daemon implements Writable {
private void writeTableStats(DataOutput out) throws IOException {
out.writeInt(idToTblStats.size());
for (Entry<Long, TableStats> entry : idToTblStats.entrySet()) {
for (Entry<Long, TableStatsMeta> entry : idToTblStats.entrySet()) {
entry.getValue().write(out);
}
}
@ -996,29 +1003,29 @@ public class AnalysisManager extends Daemon implements Writable {
analysisJobIdToTaskMap.put(jobId, tasks);
}
public TableStats findTableStatsStatus(long tblId) {
public TableStatsMeta findTableStatsStatus(long tblId) {
return idToTblStats.get(tblId);
}
// Invoke this when load transaction finished.
public void updateUpdatedRows(long tblId, long rows) {
TableStats statsStatus = idToTblStats.get(tblId);
TableStatsMeta statsStatus = idToTblStats.get(tblId);
if (statsStatus != null) {
statsStatus.updatedRows.addAndGet(rows);
logCreateTableStats(statsStatus);
}
}
public void updateTableStatsStatus(TableStats tableStats) {
public void updateTableStatsStatus(TableStatsMeta tableStats) {
replayUpdateTableStatsStatus(tableStats);
logCreateTableStats(tableStats);
}
public void replayUpdateTableStatsStatus(TableStats tableStats) {
public void replayUpdateTableStatsStatus(TableStatsMeta tableStats) {
idToTblStats.put(tableStats.tblId, tableStats);
}
public void logCreateTableStats(TableStats tableStats) {
public void logCreateTableStats(TableStatsMeta tableStats) {
Env.getCurrentEnv().getEditLog().logCreateTableStats(tableStats);
}
@ -1030,7 +1037,7 @@ public class AnalysisManager extends Daemon implements Writable {
@VisibleForTesting
protected Set<String> findReAnalyzeNeededPartitions(TableIf table) {
TableStats tableStats = findTableStatsStatus(table.getId());
TableStatsMeta tableStats = findTableStatsStatus(table.getId());
if (tableStats == null) {
return table.getPartitionNames().stream().map(table::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
@ -1071,7 +1078,7 @@ public class AnalysisManager extends Daemon implements Writable {
// Remove col stats status from TableStats if failed load some col stats after analyze corresponding column so that
// we could make sure it would be analyzed again soon if user or system submit job for that column again.
public void removeColStatsStatus(long tblId, String colName) {
TableStats tableStats = findTableStatsStatus(tblId);
TableStatsMeta tableStats = findTableStatsStatus(tblId);
if (tableStats != null) {
tableStats.removeColumn(colName);
}
@ -1089,4 +1096,12 @@ public class AnalysisManager extends Daemon implements Writable {
public void replayTableStatsDeletion(TableStatsDeletionLog log) {
idToTblStats.remove(log.id);
}
public ColStatsMeta findColStatsMeta(long tblId, String colName) {
TableStatsMeta tableStats = findTableStatsStatus(tblId);
if (tableStats == null) {
return null;
}
return tableStats.findColumnStatsMeta(colName);
}
}

View File

@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import com.google.gson.annotations.SerializedName;
import java.util.concurrent.atomic.AtomicLong;
public class ColStatsMeta {
@SerializedName("updateTime")
public long updatedTime;
@SerializedName("method")
public AnalysisMethod analysisMethod;
@SerializedName("type")
public AnalysisType analysisType;
@SerializedName("queriedTimes")
public final AtomicLong queriedTimes = new AtomicLong();
// TODO: For column that manually analyzed, we should use same analyze method as user specified.
@SerializedName("trigger")
public JobType jobType;
public ColStatsMeta(long updatedTime, AnalysisMethod analysisMethod,
AnalysisType analysisType, JobType jobType, long queriedTimes) {
this.updatedTime = updatedTime;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.jobType = jobType;
this.queriedTimes.addAndGet(queriedTimes);
}
public void clear() {
updatedTime = 0;
}
}

View File

@ -121,7 +121,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(
new TableStats(table.getId(), Long.parseLong(rowCount), info));
new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
}
/**

View File

@ -84,7 +84,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
StatisticsUtil.execStatisticQuery(new StringSubstitutor(params).replace(ANALYZE_TABLE_COUNT_TEMPLATE));
String rowCount = columnResult.get(0).get(0);
Env.getCurrentEnv().getAnalysisManager()
.updateTableStatsStatus(new TableStats(table.getId(), Long.parseLong(rowCount), info));
.updateTableStatsStatus(new TableStatsMeta(table.getId(), Long.parseLong(rowCount), info));
}
/**

View File

@ -118,7 +118,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
if (table.getDataSize(true) < Config.huge_table_lower_bound_size_in_bytes) {
return false;
}
TableStats tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
return System.currentTimeMillis() - tableStats.updatedTime < Config.huge_table_auto_analyze_interval_in_millis;
}
@ -155,7 +155,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
TableIf table = StatisticsUtil
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
TableStats tblStats = analysisManager.findTableStatsStatus(table.getId());
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
if (!table.needReAnalyzeTable(tblStats)) {
return null;

View File

@ -20,8 +20,6 @@ package org.apache.doris.statistics;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisInfo.JobType;
import com.google.gson.annotations.SerializedName;
@ -29,15 +27,19 @@ import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class TableStats implements Writable {
public class TableStatsMeta implements Writable {
@SerializedName("tblId")
public final long tblId;
@SerializedName("idxId")
public final long idxId;
@SerializedName("updatedRows")
@ -51,39 +53,22 @@ public class TableStats implements Writable {
@SerializedName("rowCount")
public final long rowCount;
@SerializedName("method")
public final AnalysisMethod analysisMethod;
@SerializedName("type")
public final AnalysisType analysisType;
@SerializedName("updateTime")
public long updatedTime;
@SerializedName("colLastUpdatedTime")
private ConcurrentMap<String, Long> colLastUpdatedTime = new ConcurrentHashMap<>();
@SerializedName("colNameToColStatsMeta")
private ConcurrentMap<String, ColStatsMeta> colNameToColStatsMeta = new ConcurrentHashMap<>();
@SerializedName("trigger")
public JobType jobType;
// It's necessary to store these fields separately from AnalysisInfo, since the lifecycle between AnalysisInfo
// and TableStats is quite different.
public TableStats(long tblId, long rowCount, AnalysisInfo analyzedJob) {
public TableStatsMeta(long tblId, long rowCount, AnalysisInfo analyzedJob) {
this.tblId = tblId;
this.idxId = -1;
this.rowCount = rowCount;
analysisMethod = analyzedJob.analysisMethod;
analysisType = analyzedJob.analysisType;
updatedTime = System.currentTimeMillis();
String cols = analyzedJob.colName;
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
if (analyzedJob.colName.startsWith("[") && analyzedJob.colName.endsWith("]")) {
cols = cols.substring(1, cols.length() - 1);
}
for (String col : cols.split(",")) {
colLastUpdatedTime.put(col, updatedTime);
}
jobType = analyzedJob.jobType;
updateByJob(analyzedJob);
}
@Override
@ -92,30 +77,61 @@ public class TableStats implements Writable {
Text.writeString(out, json);
}
public static TableStats read(DataInput dataInput) throws IOException {
public static TableStatsMeta read(DataInput dataInput) throws IOException {
String json = Text.readString(dataInput);
TableStats tableStats = GsonUtils.GSON.fromJson(json, TableStats.class);
TableStatsMeta tableStats = GsonUtils.GSON.fromJson(json, TableStatsMeta.class);
// Might be null counterintuitively, for compatible
if (tableStats.colLastUpdatedTime == null) {
tableStats.colLastUpdatedTime = new ConcurrentHashMap<>();
if (tableStats.colNameToColStatsMeta == null) {
tableStats.colNameToColStatsMeta = new ConcurrentHashMap<>();
}
return tableStats;
}
public long findColumnLastUpdateTime(String colName) {
return colLastUpdatedTime.getOrDefault(colName, 0L);
ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(colName);
if (colStatsMeta == null) {
return 0;
}
return colStatsMeta.updatedTime;
}
public ColStatsMeta findColumnStatsMeta(String colName) {
return colNameToColStatsMeta.get(colName);
}
public void removeColumn(String colName) {
colLastUpdatedTime.remove(colName);
colNameToColStatsMeta.remove(colName);
}
public Set<String> analyzeColumns() {
return colLastUpdatedTime.keySet();
return colNameToColStatsMeta.keySet();
}
public void reset() {
updatedTime = 0;
colLastUpdatedTime.replaceAll((k, v) -> 0L);
colNameToColStatsMeta.values().forEach(ColStatsMeta::clear);
}
public void updateByJob(AnalysisInfo analyzedJob) {
updatedTime = System.currentTimeMillis();
String colNameStr = analyzedJob.colName;
// colName field AnalyzeJob's format likes: "[col1, col2]", we need to remove brackets here
// TODO: Refactor this later
if (analyzedJob.colName.startsWith("[") && analyzedJob.colName.endsWith("]")) {
colNameStr = colNameStr.substring(1, colNameStr.length() - 1);
}
List<String> cols = Arrays.stream(colNameStr.split(",")).map(String::trim).collect(Collectors.toList());
for (String col : cols) {
ColStatsMeta colStatsMeta = colNameToColStatsMeta.get(col);
if (colStatsMeta == null) {
colNameToColStatsMeta.put(col, new ColStatsMeta(updatedTime,
analyzedJob.analysisMethod, analyzedJob.analysisType, analyzedJob.jobType, 0));
} else {
colStatsMeta.updatedTime = updatedTime;
colStatsMeta.analysisType = analyzedJob.analysisType;
colStatsMeta.analysisMethod = analyzedJob.analysisMethod;
}
}
jobType = analyzedJob.jobType;
}
}

View File

@ -339,10 +339,11 @@ public class AnalysisManagerTest {
};
OlapTable olapTable = new OlapTable();
TableStats stats1 = new TableStats(0, 50, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats1 = new TableStatsMeta(0, 50, new AnalysisInfoBuilder().setColName("col1").build());
stats1.updatedRows.addAndGet(30);
Assertions.assertTrue(olapTable.needReAnalyzeTable(stats1));
TableStats stats2 = new TableStats(0, 190, new AnalysisInfoBuilder().setColName("col1").build());
TableStatsMeta stats2 = new TableStatsMeta(0, 190, new AnalysisInfoBuilder().setColName("col1").build());
stats2.updatedRows.addAndGet(20);
Assertions.assertFalse(olapTable.needReAnalyzeTable(stats2));

View File

@ -181,9 +181,9 @@ public class StatisticsAutoCollectorTest {
int count = 0;
TableStats[] tableStatsArr =
new TableStats[] {new TableStats(0, 0, analysisInfo),
new TableStats(0, 0, analysisInfo), null};
TableStatsMeta[] tableStatsArr =
new TableStatsMeta[] {new TableStatsMeta(0, 0, analysisInfo),
new TableStatsMeta(0, 0, analysisInfo), null};
{
tableStatsArr[0].updatedRows.addAndGet(100);
@ -191,7 +191,7 @@ public class StatisticsAutoCollectorTest {
}
@Mock
public TableStats findTableStatsStatus(long tblId) {
public TableStatsMeta findTableStatsStatus(long tblId) {
return tableStatsArr[count++];
}
};

View File

@ -55,7 +55,7 @@ testDirectories = ""
excludeGroups = ""
// this suites will not be executed
excludeSuites = "test_sql_block_rule,test_ddl,test_analyze,test_leading,test_stream_load_move_memtable,test_profile,test_broker_load,test_spark_load,test_refresh_mtmv,test_bitmap_filter,nereids_delete_mow_partial_update"
excludeSuites = "test_sql_block_rule,test_ddl,test_leading,test_stream_load_move_memtable,test_profile,test_broker_load,test_spark_load,test_refresh_mtmv,test_bitmap_filter,nereids_delete_mow_partial_update"
// this directories will not be executed
excludeDirectories = "workload_manager_p1"

View File

@ -1,3 +1,5 @@
import java.util.stream.Collectors
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
@ -878,7 +880,7 @@ PARTITION `p599` VALUES IN (599)
SHOW COLUMN CACHED STATS test_600_partition_table_analyze(id);
"""
def expected_col_stats = { r, expected_value, idx ->
def expected_col_stats = { r, expected_value, idx ->
return (int) Double.parseDouble(r[0][idx]) == expected_value
}
@ -1038,7 +1040,7 @@ PARTITION `p599` VALUES IN (599)
sql """
DROP TABLE IF EXISTS two_thousand_partition_table_test
"""
// check analyze table with thousand partition
sql """
CREATE TABLE two_thousand_partition_table_test (col1 int(11451) not null)
DUPLICATE KEY(col1)
@ -1057,4 +1059,49 @@ PARTITION `p599` VALUES IN (599)
ANALYZE TABLE two_thousand_partition_table_test WITH SYNC;
"""
// meta check
sql """
CREATE TABLE `test_meta_management` (
`col1` varchar(11451) NOT NULL,
`col2` int(11) NOT NULL,
`col3` int(11) NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(`col1`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`col1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
sql """insert into test_meta_management values(1, 2, 3);"""
sql """insert into test_meta_management values(4, 5, 6);"""
sql """insert into test_meta_management values(7, 1, 9);"""
sql """insert into test_meta_management values(3, 8, 2);"""
sql """insert into test_meta_management values(5, 2, 1);"""
sql """insert into test_meta_management values(41, 2, 3)"""
sql """ANALYZE TABLE test_meta_management WITH SYNC"""
sql """DROP STATS test_meta_management(col1)"""
def afterDropped = sql """SHOW TABLE STATS test_meta_management"""
def convert_col_list_str_to_java_collection = { cols ->
if (cols.startsWith("[") && cols.endsWith("]")) {
cols = cols.substring(1, cols.length() - 1);
}
return Arrays.stream(cols.split(",")).map(String::trim).collect(Collectors.toList())
}
def check_column = { r, expected ->
expected_result = convert_col_list_str_to_java_collection(expected)
actual_result = convert_col_list_str_to_java_collection(r[0][4])
System.out.println(expected_result)
System.out.println(actual_result)
return expected_result.containsAll(actual_result) && actual_result.containsAll(expected_result)
}
assert check_column(afterDropped, "[col2, col3]")
sql """ANALYZE TABLE test_meta_management WITH SYNC"""
afterDropped = sql """SHOW TABLE STATS test_meta_management"""
assert check_column(afterDropped, "[col1, col2, col3]")
}