[refactor](stats) Use id instead name in analysis info (#25213)
This commit is contained in:
@ -89,6 +89,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
private boolean isAllColumns;
|
||||
|
||||
// after analyzed
|
||||
private long catalogId;
|
||||
private long dbId;
|
||||
private TableIf table;
|
||||
|
||||
@ -130,6 +131,7 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
String tblName = tableName.getTbl();
|
||||
CatalogIf catalog = analyzer.getEnv().getCatalogMgr()
|
||||
.getCatalogOrAnalysisException(catalogName);
|
||||
this.catalogId = catalog.getId();
|
||||
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
|
||||
dbId = db.getId();
|
||||
table = db.getTableOrAnalysisException(tblName);
|
||||
@ -329,4 +331,8 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
|
||||
public boolean isAllColumns() {
|
||||
return isAllColumns;
|
||||
}
|
||||
|
||||
public long getCatalogId() {
|
||||
return catalogId;
|
||||
}
|
||||
}
|
||||
|
||||
@ -92,7 +92,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
|
||||
|
||||
@SerializedName(value = "idToCatalog")
|
||||
private final Map<Long, CatalogIf> idToCatalog = Maps.newConcurrentMap();
|
||||
private final Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> idToCatalog = Maps.newConcurrentMap();
|
||||
// this map will be regenerated from idToCatalog, so not need to persist.
|
||||
private final Map<String, CatalogIf> nameToCatalog = Maps.newConcurrentMap();
|
||||
// record last used database of every catalog
|
||||
@ -163,7 +163,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
return nameToCatalog.get(name);
|
||||
}
|
||||
|
||||
public CatalogIf getCatalog(long id) {
|
||||
public CatalogIf<? extends DatabaseIf<? extends TableIf>> getCatalog(long id) {
|
||||
return idToCatalog.get(id);
|
||||
}
|
||||
|
||||
@ -173,7 +173,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
ErrorCode.ERR_UNKNOWN_CATALOG));
|
||||
}
|
||||
|
||||
public <E extends Exception> CatalogIf getCatalogOrException(long id, Function<Long, E> e) throws E {
|
||||
public <E extends Exception> CatalogIf<? extends DatabaseIf<? extends TableIf>>
|
||||
getCatalogOrException(long id, Function<Long, E> e) throws E {
|
||||
CatalogIf catalog = idToCatalog.get(id);
|
||||
if (catalog == null) {
|
||||
throw e.apply(id);
|
||||
@ -1173,7 +1174,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
|
||||
internalCatalog = (InternalCatalog) idToCatalog.get(InternalCatalog.INTERNAL_CATALOG_ID);
|
||||
}
|
||||
|
||||
public Map<Long, CatalogIf> getIdToCatalog() {
|
||||
public Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> getIdToCatalog() {
|
||||
return idToCatalog;
|
||||
}
|
||||
|
||||
|
||||
@ -201,6 +201,7 @@ import org.apache.doris.statistics.Histogram;
|
||||
import org.apache.doris.statistics.StatisticsRepository;
|
||||
import org.apache.doris.statistics.TableStatsMeta;
|
||||
import org.apache.doris.statistics.query.QueryStatsUtil;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Diagnoser;
|
||||
import org.apache.doris.system.SystemInfoService;
|
||||
@ -243,6 +244,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Predicate;
|
||||
@ -2650,9 +2652,16 @@ public class ShowExecutor {
|
||||
for (AnalysisInfo analysisInfo : results) {
|
||||
List<String> row = new ArrayList<>();
|
||||
row.add(String.valueOf(analysisInfo.jobId));
|
||||
row.add(analysisInfo.catalogName);
|
||||
row.add(analysisInfo.dbName);
|
||||
row.add(analysisInfo.tblName);
|
||||
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(analysisInfo.catalogId);
|
||||
row.add(c.getName());
|
||||
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
|
||||
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
|
||||
if (databaseIf.isPresent()) {
|
||||
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
|
||||
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
|
||||
} else {
|
||||
row.add("DB may get deleted");
|
||||
}
|
||||
row.add(analysisInfo.colName);
|
||||
row.add(analysisInfo.jobType.toString());
|
||||
row.add(analysisInfo.analysisType.toString());
|
||||
|
||||
@ -17,8 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.FeMetaVersion;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
@ -37,8 +35,6 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Type;
|
||||
import java.text.ParseException;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -91,14 +87,14 @@ public class AnalysisInfo implements Writable {
|
||||
@SerializedName("taskIds")
|
||||
public final List<Long> taskIds;
|
||||
|
||||
@SerializedName("catalogName")
|
||||
public final String catalogName;
|
||||
@SerializedName("catalogId")
|
||||
public final long catalogId;
|
||||
|
||||
@SerializedName("dbName")
|
||||
public final String dbName;
|
||||
@SerializedName("dbId")
|
||||
public final long dbId;
|
||||
|
||||
@SerializedName("tblName")
|
||||
public final String tblName;
|
||||
@SerializedName("tblId")
|
||||
public final long tblId;
|
||||
|
||||
// TODO: Map here is wired, List is enough
|
||||
@SerializedName("colToPartitions")
|
||||
@ -183,7 +179,7 @@ public class AnalysisInfo implements Writable {
|
||||
@SerializedName("forceFull")
|
||||
public final boolean forceFull;
|
||||
|
||||
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, String catalogName, String dbName, String tblName,
|
||||
public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId, long dbId, long tblId,
|
||||
Map<String, Set<String>> colToPartitions, Set<String> partitionNames, String colName, Long indexId,
|
||||
JobType jobType, AnalysisMode analysisMode, AnalysisMethod analysisMethod, AnalysisType analysisType,
|
||||
int samplePercent, long sampleRows, int maxBucketNum, long periodTimeInMs, String message,
|
||||
@ -193,9 +189,9 @@ public class AnalysisInfo implements Writable {
|
||||
this.jobId = jobId;
|
||||
this.taskId = taskId;
|
||||
this.taskIds = taskIds;
|
||||
this.catalogName = catalogName;
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
this.catalogId = catalogId;
|
||||
this.dbId = dbId;
|
||||
this.tblId = tblId;
|
||||
this.colToPartitions = colToPartitions;
|
||||
this.partitionNames = partitionNames;
|
||||
this.colName = colName;
|
||||
@ -229,9 +225,9 @@ public class AnalysisInfo implements Writable {
|
||||
public String toString() {
|
||||
StringJoiner sj = new StringJoiner("\n", getClass().getName() + ":\n", "\n");
|
||||
sj.add("JobId: " + jobId);
|
||||
sj.add("CatalogName: " + catalogName);
|
||||
sj.add("DBName: " + dbName);
|
||||
sj.add("TableName: " + tblName);
|
||||
sj.add("catalogId: " + catalogId);
|
||||
sj.add("dbId: " + dbId);
|
||||
sj.add("TableName: " + tblId);
|
||||
sj.add("ColumnName: " + colName);
|
||||
sj.add("TaskType: " + analysisType);
|
||||
sj.add("TaskMode: " + analysisMode);
|
||||
@ -291,7 +287,8 @@ public class AnalysisInfo implements Writable {
|
||||
return null;
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
Type type = new TypeToken<Map<String, Set<String>>>() {}.getType();
|
||||
Type type = new TypeToken<Map<String, Set<String>>>() {
|
||||
}.getType();
|
||||
return gson.fromJson(colToPartitionStr, type);
|
||||
}
|
||||
|
||||
@ -302,53 +299,15 @@ public class AnalysisInfo implements Writable {
|
||||
}
|
||||
|
||||
public static AnalysisInfo read(DataInput dataInput) throws IOException {
|
||||
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_123) {
|
||||
AnalysisInfoBuilder analysisInfoBuilder = new AnalysisInfoBuilder();
|
||||
analysisInfoBuilder.setJobId(dataInput.readLong());
|
||||
long taskId = dataInput.readLong();
|
||||
analysisInfoBuilder.setTaskId(taskId);
|
||||
analysisInfoBuilder.setCatalogName(Text.readString(dataInput));
|
||||
analysisInfoBuilder.setDbName(Text.readString(dataInput));
|
||||
analysisInfoBuilder.setTblName(Text.readString(dataInput));
|
||||
int size = dataInput.readInt();
|
||||
Map<String, Set<String>> colToPartitions = new HashMap<>();
|
||||
for (int i = 0; i < size; i++) {
|
||||
String k = Text.readString(dataInput);
|
||||
int partSize = dataInput.readInt();
|
||||
Set<String> parts = new HashSet<>();
|
||||
for (int j = 0; j < partSize; j++) {
|
||||
parts.add(Text.readString(dataInput));
|
||||
}
|
||||
colToPartitions.put(k, parts);
|
||||
String json = Text.readString(dataInput);
|
||||
AnalysisInfo analysisInfo = GsonUtils.GSON.fromJson(json, AnalysisInfo.class);
|
||||
if (analysisInfo.cronExprStr != null) {
|
||||
try {
|
||||
analysisInfo.cronExpression = new CronExpression(analysisInfo.cronExprStr);
|
||||
} catch (ParseException e) {
|
||||
LOG.warn("Cron expression of job is invalid, there is a bug", e);
|
||||
}
|
||||
analysisInfoBuilder.setColToPartitions(colToPartitions);
|
||||
analysisInfoBuilder.setColName(Text.readString(dataInput));
|
||||
analysisInfoBuilder.setIndexId(dataInput.readLong());
|
||||
analysisInfoBuilder.setJobType(JobType.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setAnalysisMode(AnalysisMode.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setAnalysisMethod(AnalysisMethod.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setAnalysisType(AnalysisType.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setSamplePercent(dataInput.readInt());
|
||||
analysisInfoBuilder.setSampleRows(dataInput.readInt());
|
||||
analysisInfoBuilder.setMaxBucketNum(dataInput.readInt());
|
||||
analysisInfoBuilder.setPeriodTimeInMs(dataInput.readLong());
|
||||
analysisInfoBuilder.setLastExecTimeInMs(dataInput.readLong());
|
||||
analysisInfoBuilder.setState(AnalysisState.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setScheduleType(ScheduleType.valueOf(Text.readString(dataInput)));
|
||||
analysisInfoBuilder.setMessage(Text.readString(dataInput));
|
||||
analysisInfoBuilder.setExternalTableLevelTask(dataInput.readBoolean());
|
||||
return analysisInfoBuilder.build();
|
||||
} else {
|
||||
String json = Text.readString(dataInput);
|
||||
AnalysisInfo analysisInfo = GsonUtils.GSON.fromJson(json, AnalysisInfo.class);
|
||||
if (analysisInfo.cronExprStr != null) {
|
||||
try {
|
||||
analysisInfo.cronExpression = new CronExpression(analysisInfo.cronExprStr);
|
||||
} catch (ParseException e) {
|
||||
LOG.warn("Cron expression of job is invalid, there is a bug", e);
|
||||
}
|
||||
}
|
||||
return analysisInfo;
|
||||
}
|
||||
return analysisInfo;
|
||||
}
|
||||
}
|
||||
|
||||
@ -33,9 +33,9 @@ public class AnalysisInfoBuilder {
|
||||
private long jobId;
|
||||
private long taskId;
|
||||
private List<Long> taskIds;
|
||||
private String catalogName;
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
private long catalogId;
|
||||
private long dbId;
|
||||
private long tblId;
|
||||
private Map<String, Set<String>> colToPartitions;
|
||||
private Set<String> partitionNames;
|
||||
private String colName;
|
||||
@ -68,9 +68,9 @@ public class AnalysisInfoBuilder {
|
||||
jobId = info.jobId;
|
||||
taskId = info.taskId;
|
||||
taskIds = info.taskIds;
|
||||
catalogName = info.catalogName;
|
||||
dbName = info.dbName;
|
||||
tblName = info.tblName;
|
||||
catalogId = info.catalogId;
|
||||
dbId = info.dbId;
|
||||
tblId = info.tblId;
|
||||
colToPartitions = info.colToPartitions;
|
||||
partitionNames = info.partitionNames;
|
||||
colName = info.colName;
|
||||
@ -112,18 +112,18 @@ public class AnalysisInfoBuilder {
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setCatalogName(String catalogName) {
|
||||
this.catalogName = catalogName;
|
||||
public AnalysisInfoBuilder setCatalogId(long catalogId) {
|
||||
this.catalogId = catalogId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setDbName(String dbName) {
|
||||
this.dbName = dbName;
|
||||
public AnalysisInfoBuilder setDBId(long dbId) {
|
||||
this.dbId = dbId;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AnalysisInfoBuilder setTblName(String tblName) {
|
||||
this.tblName = tblName;
|
||||
public AnalysisInfoBuilder setTblId(long tblId) {
|
||||
this.tblId = tblId;
|
||||
return this;
|
||||
}
|
||||
|
||||
@ -246,7 +246,7 @@ public class AnalysisInfoBuilder {
|
||||
}
|
||||
|
||||
public AnalysisInfo build() {
|
||||
return new AnalysisInfo(jobId, taskId, taskIds, catalogName, dbName, tblName, colToPartitions, partitionNames,
|
||||
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
|
||||
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
|
||||
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
|
||||
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
|
||||
@ -258,9 +258,9 @@ public class AnalysisInfoBuilder {
|
||||
.setJobId(jobId)
|
||||
.setTaskId(taskId)
|
||||
.setTaskIds(taskIds)
|
||||
.setCatalogName(catalogName)
|
||||
.setDbName(dbName)
|
||||
.setTblName(tblName)
|
||||
.setCatalogId(catalogId)
|
||||
.setDBId(dbId)
|
||||
.setTblId(tblId)
|
||||
.setColToPartitions(colToPartitions)
|
||||
.setColName(colName)
|
||||
.setIndexId(indexId)
|
||||
|
||||
@ -44,6 +44,7 @@ import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.AnalyzeDeletionLog;
|
||||
import org.apache.doris.persist.TableStatsDeletionLog;
|
||||
@ -56,6 +57,7 @@ import org.apache.doris.statistics.AnalysisInfo.AnalysisMode;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
import org.apache.doris.statistics.util.DBObjects;
|
||||
import org.apache.doris.statistics.util.SimpleQueue;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -84,6 +86,7 @@ import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.TreeMap;
|
||||
@ -365,7 +368,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
|
||||
if (!jobInfo.partitionOnly && stmt.isAllColumns()
|
||||
&& StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
|
||||
&& StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
|
||||
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
|
||||
}
|
||||
if (isSync) {
|
||||
@ -398,9 +401,16 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
List<String> row = new ArrayList<>();
|
||||
row.add(String.valueOf(analysisInfo.jobId));
|
||||
row.add(analysisInfo.catalogName);
|
||||
row.add(analysisInfo.dbName);
|
||||
row.add(analysisInfo.tblName);
|
||||
CatalogIf<? extends DatabaseIf<? extends TableIf>> c = StatisticsUtil.findCatalog(analysisInfo.catalogId);
|
||||
row.add(c.getName());
|
||||
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = c.getDb(analysisInfo.dbId);
|
||||
row.add(databaseIf.isPresent() ? databaseIf.get().getFullName() : "DB may get deleted");
|
||||
if (databaseIf.isPresent()) {
|
||||
Optional<? extends TableIf> table = databaseIf.get().getTable(analysisInfo.tblId);
|
||||
row.add(table.isPresent() ? table.get().getName() : "Table may get deleted");
|
||||
} else {
|
||||
row.add("DB not exists anymore");
|
||||
}
|
||||
row.add(analysisInfo.colName);
|
||||
resultRows.add(row);
|
||||
}
|
||||
@ -487,11 +497,6 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlException {
|
||||
AnalysisInfoBuilder infoBuilder = new AnalysisInfoBuilder();
|
||||
long jobId = Env.getCurrentEnv().getNextId();
|
||||
String catalogName = stmt.getCatalogName();
|
||||
String db = stmt.getDBName();
|
||||
TableName tbl = stmt.getTblName();
|
||||
StatisticsUtil.convertTableNameToObjects(tbl);
|
||||
String tblName = tbl.getTbl();
|
||||
TableIf table = stmt.getTable();
|
||||
Set<String> columnNames = stmt.getColumnNames();
|
||||
Set<String> partitionNames = stmt.getPartitionNames();
|
||||
@ -508,9 +513,9 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
CronExpression cronExpression = stmt.getCron();
|
||||
|
||||
infoBuilder.setJobId(jobId);
|
||||
infoBuilder.setCatalogName(catalogName);
|
||||
infoBuilder.setDbName(db);
|
||||
infoBuilder.setTblName(tblName);
|
||||
infoBuilder.setCatalogId(stmt.getCatalogId());
|
||||
infoBuilder.setDBId(stmt.getDbId());
|
||||
infoBuilder.setTblId(stmt.getTable().getId());
|
||||
// TODO: Refactor later, DON'T MODIFY IT RIGHT NOW
|
||||
StringJoiner stringJoiner = new StringJoiner(",", "[", "]");
|
||||
for (String colName : columnNames) {
|
||||
@ -638,8 +643,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
@VisibleForTesting
|
||||
public void updateTableStats(AnalysisInfo jobInfo) {
|
||||
TableIf tbl = StatisticsUtil.findTable(jobInfo.catalogName,
|
||||
jobInfo.dbName, jobInfo.tblName);
|
||||
TableIf tbl = StatisticsUtil.findTable(jobInfo.catalogId,
|
||||
jobInfo.dbId, jobInfo.tblId);
|
||||
// External Table update table stats after table level task finished.
|
||||
if (tbl instanceof ExternalTable) {
|
||||
return;
|
||||
@ -667,11 +672,15 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
protected List<AnalysisInfo> findShowAnalyzeResult(Collection<AnalysisInfo> analysisInfos, ShowAnalyzeStmt stmt) {
|
||||
String state = stmt.getStateValue();
|
||||
TableName tblName = stmt.getDbTableName();
|
||||
TableIf tbl = null;
|
||||
if (tblName != null) {
|
||||
tbl = StatisticsUtil.findTable(tblName.getCtl(), tblName.getDb(), tblName.getTbl());
|
||||
}
|
||||
long tblId = tbl == null ? -1 : tbl.getId();
|
||||
return analysisInfos.stream()
|
||||
.filter(a -> stmt.getJobId() == 0 || a.jobId == stmt.getJobId())
|
||||
.filter(a -> state == null || a.state.equals(AnalysisState.valueOf(state)))
|
||||
.filter(a -> tblName == null || a.catalogName.equals(tblName.getCtl())
|
||||
&& a.dbName.equals(tblName.getDb()) && a.tblName.equals(tblName.getTbl()))
|
||||
.filter(a -> tblName == null || a.tblId == tblId)
|
||||
.sorted(Comparator.comparingLong(a -> a.jobId))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
@ -778,8 +787,11 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
|
||||
private void checkPriv(AnalysisInfo analysisInfo) {
|
||||
DBObjects dbObjects = StatisticsUtil.convertIdToObjects(analysisInfo.catalogId,
|
||||
analysisInfo.dbId, analysisInfo.tblId);
|
||||
if (!Env.getCurrentEnv().getAccessManager()
|
||||
.checkTblPriv(ConnectContext.get(), analysisInfo.dbName, analysisInfo.tblName, PrivPredicate.SELECT)) {
|
||||
.checkTblPriv(ConnectContext.get(), dbObjects.catalog.getName(), dbObjects.db.getFullName(),
|
||||
dbObjects.table.getName(), PrivPredicate.SELECT)) {
|
||||
throw new RuntimeException("You need at least SELECT PRIV to corresponding table to kill this analyze"
|
||||
+ " job");
|
||||
}
|
||||
@ -794,8 +806,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
private BaseAnalysisTask createTask(AnalysisInfo analysisInfo) throws DdlException {
|
||||
try {
|
||||
TableIf table = StatisticsUtil.findTable(analysisInfo.catalogName,
|
||||
analysisInfo.dbName, analysisInfo.tblName);
|
||||
TableIf table = StatisticsUtil.findTable(analysisInfo.catalogId,
|
||||
analysisInfo.dbId, analysisInfo.tblId);
|
||||
return table.createAnalysisTask(analysisInfo);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to find table", t);
|
||||
|
||||
@ -30,6 +30,7 @@ import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.util.DBObjects;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
@ -94,9 +95,9 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
protected AnalysisInfo info;
|
||||
|
||||
protected CatalogIf catalog;
|
||||
protected CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
|
||||
|
||||
protected DatabaseIf db;
|
||||
protected DatabaseIf<? extends TableIf> db;
|
||||
|
||||
protected TableIf tbl;
|
||||
|
||||
@ -119,25 +120,11 @@ public abstract class BaseAnalysisTask {
|
||||
}
|
||||
|
||||
protected void init(AnalysisInfo info) {
|
||||
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
|
||||
if (catalog == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
|
||||
String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis());
|
||||
return;
|
||||
}
|
||||
db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
|
||||
if (db == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
|
||||
String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis());
|
||||
return;
|
||||
}
|
||||
tbl = (TableIf) db.getTable(info.tblName).orElse(null);
|
||||
if (tbl == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
|
||||
info, AnalysisState.FAILED,
|
||||
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
|
||||
}
|
||||
tableSample = getTableSample();
|
||||
DBObjects dbObjects = StatisticsUtil.convertIdToObjects(info.catalogId, info.dbId, info.tblId);
|
||||
catalog = dbObjects.catalog;
|
||||
db = dbObjects.db;
|
||||
tbl = dbObjects.table;
|
||||
// External Table level task doesn't contain a column. Don't need to do the column related analyze.
|
||||
if (info.externalTableLevelTask) {
|
||||
return;
|
||||
@ -146,7 +133,7 @@ public abstract class BaseAnalysisTask {
|
||||
|| info.analysisType.equals(AnalysisType.HISTOGRAM))) {
|
||||
col = tbl.getColumn(info.colName);
|
||||
if (col == null) {
|
||||
throw new RuntimeException(String.format("Column with name %s not exists", info.tblName));
|
||||
throw new RuntimeException(String.format("Column with name %s not exists", tbl.getName()));
|
||||
}
|
||||
Preconditions.checkArgument(!StatisticsUtil.isUnsupportedType(col.getType()),
|
||||
String.format("Column with type %s is not supported", col.getType().toString()));
|
||||
@ -261,7 +248,7 @@ public abstract class BaseAnalysisTask {
|
||||
QueryState queryState = stmtExecutor.getContext().getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
info.catalogName, info.dbName, info.colName, stmtExecutor.getOriginStmt().toString(),
|
||||
catalog.getName(), db.getFullName(), info.colName, stmtExecutor.getOriginStmt().toString(),
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
} finally {
|
||||
|
||||
@ -224,7 +224,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
info.catalogName, info.dbName, info.colName, partitionCollectSQL,
|
||||
catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL,
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
@ -254,11 +254,11 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -67,8 +67,8 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
params.put("tblId", String.valueOf(tbl.getId()));
|
||||
params.put("idxId", String.valueOf(info.indexId));
|
||||
params.put("colId", String.valueOf(info.colName));
|
||||
params.put("dbName", info.dbName);
|
||||
params.put("tblName", String.valueOf(info.tblName));
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("tblName", tbl.getName());
|
||||
params.put("colName", String.valueOf(info.colName));
|
||||
params.put("sampleRate", getSampleRateFunction());
|
||||
params.put("maxBucketNum", String.valueOf(info.maxBucketNum));
|
||||
|
||||
@ -130,11 +130,11 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
info.catalogName, info.dbName, info.colName, sql, queryState.getErrorMessage()));
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
info.catalogName, info.dbName, info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -114,9 +114,9 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
String.valueOf(olapTable.getPartition(partName).getId());
|
||||
params.put("partId", partId);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(column));
|
||||
params.put("dbName", info.dbName);
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("colName", colName);
|
||||
params.put("tblName", String.valueOf(info.tblName));
|
||||
params.put("tblName", tbl.getName());
|
||||
params.put("sql", sql);
|
||||
StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
|
||||
}
|
||||
|
||||
@ -137,9 +137,9 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("idxId", String.valueOf(info.indexId));
|
||||
params.put("colId", String.valueOf(info.colName));
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
params.put("dbName", info.dbName);
|
||||
params.put("colName", String.valueOf(info.colName));
|
||||
params.put("tblName", String.valueOf(info.tblName));
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("colName", info.colName);
|
||||
params.put("tblName", tbl.getName());
|
||||
params.put("scaleFactor", String.valueOf(scaleFactor));
|
||||
params.put("tablets", tabletStr.isEmpty() ? "" : String.format("TABLET(%s)", tabletStr));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
@ -207,9 +207,9 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("idxId", String.valueOf(info.indexId));
|
||||
params.put("colId", String.valueOf(info.colName));
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
params.put("dbName", info.dbName);
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("colName", String.valueOf(info.colName));
|
||||
params.put("tblName", String.valueOf(info.tblName));
|
||||
params.put("tblName", String.valueOf(tbl.getName()));
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
try {
|
||||
tbl.readLock();
|
||||
@ -249,7 +249,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
info.catalogName, info.dbName, info.colName, partitionCollectSQL,
|
||||
catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL,
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
@ -32,7 +31,6 @@ import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hudi.common.util.VisibleForTesting;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -126,13 +124,11 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
List<AnalysisInfo> analysisInfos, TableIf table) {
|
||||
AnalysisMethod analysisMethod = table.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes
|
||||
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
|
||||
TableName tableName = new TableName(db.getCatalog().getName(), db.getFullName(),
|
||||
table.getName());
|
||||
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
|
||||
.setJobId(Env.getCurrentEnv().getNextId())
|
||||
.setCatalogName(db.getCatalog().getName())
|
||||
.setDbName(db.getFullName())
|
||||
.setTblName(tableName.getTbl())
|
||||
.setCatalogId(db.getCatalog().getId())
|
||||
.setDBId(db.getId())
|
||||
.setTblId(table.getId())
|
||||
.setColName(
|
||||
table.getBaseSchema().stream().filter(c -> !StatisticsUtil.isUnsupportedType(c.getType()))
|
||||
.map(
|
||||
@ -153,7 +149,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
@VisibleForTesting
|
||||
protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
|
||||
TableIf table = StatisticsUtil
|
||||
.findTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName);
|
||||
.findTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId);
|
||||
AnalysisManager analysisManager = Env.getServingEnv().getAnalysisManager();
|
||||
TableStatsMeta tblStats = analysisManager.findTableStatsStatus(table.getId());
|
||||
|
||||
@ -170,27 +166,4 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
return new AnalysisInfoBuilder(jobInfo).setColToPartitions(needRunPartitions).build();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected AnalysisInfo getAnalysisJobInfo(AnalysisInfo jobInfo, TableIf table,
|
||||
Set<String> needRunPartitions) {
|
||||
Map<String, Set<String>> newColToPartitions = Maps.newHashMap();
|
||||
Map<String, Set<String>> colToPartitions = jobInfo.colToPartitions;
|
||||
if (colToPartitions == null) {
|
||||
for (Column c : table.getColumns()) {
|
||||
if (StatisticsUtil.isUnsupportedType(c.getType())) {
|
||||
continue;
|
||||
}
|
||||
newColToPartitions.put(c.getName(), needRunPartitions);
|
||||
}
|
||||
} else {
|
||||
colToPartitions.keySet().forEach(colName -> {
|
||||
Column column = table.getColumn(colName);
|
||||
if (column != null) {
|
||||
newColToPartitions.put(colName, needRunPartitions);
|
||||
}
|
||||
});
|
||||
}
|
||||
return new AnalysisInfoBuilder(jobInfo)
|
||||
.setColToPartitions(newColToPartitions).build();
|
||||
}
|
||||
}
|
||||
|
||||
@ -55,7 +55,7 @@ public class StatisticsCleaner extends MasterDaemon {
|
||||
private OlapTable colStatsTbl;
|
||||
private OlapTable histStatsTbl;
|
||||
|
||||
private Map<Long, CatalogIf> idToCatalog;
|
||||
private Map<Long, CatalogIf<? extends DatabaseIf<? extends TableIf>>> idToCatalog;
|
||||
private Map<Long, DatabaseIf> idToDb;
|
||||
private Map<Long, TableIf> idToTbl;
|
||||
|
||||
|
||||
@ -76,7 +76,7 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
|
||||
if (StatisticsUtil.isExternalTable(jobInfo.catalogName, jobInfo.dbName, jobInfo.tblName)) {
|
||||
if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
|
||||
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false);
|
||||
}
|
||||
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos);
|
||||
|
||||
@ -23,14 +23,14 @@ import org.apache.doris.datasource.CatalogIf;
|
||||
|
||||
public class DBObjects {
|
||||
|
||||
public final CatalogIf<DatabaseIf> catalog;
|
||||
public final CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog;
|
||||
|
||||
public final DatabaseIf<TableIf> db;
|
||||
public final DatabaseIf<? extends TableIf> db;
|
||||
|
||||
public final TableIf table;
|
||||
|
||||
public DBObjects(CatalogIf<DatabaseIf> catalog,
|
||||
DatabaseIf<TableIf> db, TableIf table) {
|
||||
public DBObjects(CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog,
|
||||
DatabaseIf<? extends TableIf> db, TableIf table) {
|
||||
this.catalog = catalog;
|
||||
this.db = db;
|
||||
this.table = table;
|
||||
|
||||
@ -301,13 +301,13 @@ public class StatisticsUtil {
|
||||
|
||||
}
|
||||
|
||||
|
||||
public static DBObjects convertTableNameToObjects(TableName tableName) {
|
||||
CatalogIf<DatabaseIf> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(tableName.getCtl());
|
||||
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalogIf =
|
||||
Env.getCurrentEnv().getCatalogMgr().getCatalog(tableName.getCtl());
|
||||
if (catalogIf == null) {
|
||||
throw new IllegalStateException(String.format("Catalog:%s doesn't exist", tableName.getCtl()));
|
||||
}
|
||||
DatabaseIf<TableIf> databaseIf = catalogIf.getDbNullable(tableName.getDb());
|
||||
DatabaseIf<? extends TableIf> databaseIf = catalogIf.getDbNullable(tableName.getDb());
|
||||
if (databaseIf == null) {
|
||||
throw new IllegalStateException(String.format("DB:%s doesn't exist", tableName.getDb()));
|
||||
}
|
||||
@ -318,12 +318,17 @@ public class StatisticsUtil {
|
||||
return new DBObjects(catalogIf, databaseIf, tableIf);
|
||||
}
|
||||
|
||||
public static DBObjects convertIdToObjects(long catalogId, long dbId, long tblId) {
|
||||
return new DBObjects(findCatalog(catalogId), findDatabase(catalogId, dbId), findTable(catalogId, dbId, tblId));
|
||||
}
|
||||
|
||||
public static Column findColumn(long catalogId, long dbId, long tblId, long idxId, String columnName) {
|
||||
CatalogIf<DatabaseIf<TableIf>> catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalogIf =
|
||||
Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
|
||||
if (catalogIf == null) {
|
||||
return null;
|
||||
}
|
||||
DatabaseIf<TableIf> db = catalogIf.getDb(dbId).orElse(null);
|
||||
DatabaseIf<? extends TableIf> db = catalogIf.getDb(dbId).orElse(null);
|
||||
if (db == null) {
|
||||
return null;
|
||||
}
|
||||
@ -361,6 +366,16 @@ public class StatisticsUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static TableIf findTable(long catalogId, long dbId, long tblId) {
|
||||
try {
|
||||
DatabaseIf<? extends TableIf> db = findDatabase(catalogId, dbId);
|
||||
return db.getTableOrException(tblId,
|
||||
t -> new RuntimeException("Table: " + t + " not exists"));
|
||||
} catch (Throwable t) {
|
||||
throw new RuntimeException("Table: `" + catalogId + "." + dbId + "." + tblId + "` not exists");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw RuntimeException if database not exists.
|
||||
*/
|
||||
@ -371,6 +386,12 @@ public class StatisticsUtil {
|
||||
d -> new RuntimeException("DB: " + d + " not exists"));
|
||||
}
|
||||
|
||||
public static DatabaseIf<? extends TableIf> findDatabase(long catalogId, long dbId) {
|
||||
CatalogIf<? extends DatabaseIf<? extends TableIf>> catalog = findCatalog(catalogId);
|
||||
return catalog.getDbOrException(dbId,
|
||||
d -> new RuntimeException("DB: " + d + " not exists"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Throw RuntimeException if catalog not exists.
|
||||
*/
|
||||
@ -380,6 +401,11 @@ public class StatisticsUtil {
|
||||
.getCatalogOrException(catalogName, c -> new RuntimeException("Catalog: " + c + " not exists"));
|
||||
}
|
||||
|
||||
public static CatalogIf<? extends DatabaseIf<? extends TableIf>> findCatalog(long catalogId) {
|
||||
return Env.getCurrentEnv().getCatalogMgr().getCatalogOrException(catalogId,
|
||||
c -> new RuntimeException("Catalog: " + c + " not exists"));
|
||||
}
|
||||
|
||||
public static boolean isNullOrEmpty(String str) {
|
||||
return Optional.ofNullable(str)
|
||||
.map(String::trim)
|
||||
@ -764,6 +790,17 @@ public class StatisticsUtil {
|
||||
return table instanceof ExternalTable;
|
||||
}
|
||||
|
||||
public static boolean isExternalTable(long catalogId, long dbId, long tblId) {
|
||||
TableIf table;
|
||||
try {
|
||||
table = findTable(catalogId, dbId, tblId);
|
||||
} catch (Throwable e) {
|
||||
LOG.warn(e.getMessage());
|
||||
return false;
|
||||
}
|
||||
return table instanceof ExternalTable;
|
||||
}
|
||||
|
||||
public static boolean inAnalyzeTime(LocalTime now) {
|
||||
try {
|
||||
Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
|
||||
|
||||
Reference in New Issue
Block a user