[feature](statistics) Support for collecting statistics on materialized view (#14676)

1. Map muiltiple tasks to one Job
2. Remove the codes for analyzing whole default db, since this feature is not available and would create too many tasks and related code is confusing
3. support analyze materialized view
4. abstract the common logic to BaseTask
This commit is contained in:
Kikyou1997
2022-12-01 22:34:13 +08:00
committed by GitHub
parent 94a6ffb906
commit e5000c708e
41 changed files with 1230 additions and 795 deletions

View File

@ -2619,7 +2619,7 @@ show_create_routine_load_stmt ::=
// analyze statment
analyze_stmt ::=
KW_ANALYZE opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
KW_ANALYZE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
{:
RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties);
:}

View File

@ -23,7 +23,6 @@ import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
@ -39,14 +38,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -73,198 +70,69 @@ public class AnalyzeStmt extends DdlStmt {
private static final Predicate<Long> DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 0L;
private final TableName optTableName;
public final boolean wholeTbl;
private final TableName tableName;
private TableIf table;
private final PartitionNames optPartitionNames;
private List<String> optColumnNames;
private Map<String, String> optProperties;
// after analyzed
private long dbId;
private final Set<Long> tblIds = Sets.newHashSet();
private final List<String> partitionNames = Lists.newArrayList();
// TODO(wzt): support multiple tables
public AnalyzeStmt(TableName optTableName,
public AnalyzeStmt(TableName tableName,
List<String> optColumnNames,
PartitionNames optPartitionNames,
Map<String, String> optProperties) {
this.optTableName = optTableName;
this.tableName = tableName;
this.optColumnNames = optColumnNames;
this.optPartitionNames = optPartitionNames;
wholeTbl = CollectionUtils.isEmpty(optColumnNames);
this.optProperties = optProperties;
}
public long getDbId() {
Preconditions.checkArgument(isAnalyzed(),
"The dbId must be obtained after the parsing is complete");
return dbId;
}
public Set<Long> getTblIds() {
Preconditions.checkArgument(isAnalyzed(),
"The tblIds must be obtained after the parsing is complete");
return tblIds;
}
public Database getDb() throws AnalysisException {
Preconditions.checkArgument(isAnalyzed(),
"The db must be obtained after the parsing is complete");
return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId);
}
public List<Table> getTables() throws AnalysisException {
Preconditions.checkArgument(isAnalyzed(),
"The tables must be obtained after the parsing is complete");
Database db = getDb();
List<Table> tables = Lists.newArrayList();
db.readLock();
try {
for (Long tblId : tblIds) {
Table table = db.getTableOrAnalysisException(tblId);
tables.add(table);
}
} finally {
db.readUnlock();
}
return tables;
}
public List<String> getPartitionNames() {
Preconditions.checkArgument(isAnalyzed(),
"The partitionNames must be obtained after the parsing is complete");
return partitionNames;
}
/**
* The statistics task obtains partitions and then collects partition statistics,
* we need to filter out partitions that do not have data.
*
* @return map of tableId and partitionName
* @throws AnalysisException not analyzed
*/
public Map<Long, List<String>> getTableIdToPartitionName() throws AnalysisException {
Preconditions.checkArgument(isAnalyzed(),
"The partitionIds must be obtained after the parsing is complete");
Map<Long, List<String>> tableIdToPartitionName = Maps.newHashMap();
for (Table table : getTables()) {
table.readLock();
try {
OlapTable olapTable = (OlapTable) table;
List<String> partitionNames = getPartitionNames();
List<String> newPartitionNames = new ArrayList<>(partitionNames);
if (newPartitionNames.isEmpty() && olapTable.isPartitioned()) {
newPartitionNames.addAll(olapTable.getPartitionNames());
}
tableIdToPartitionName.put(table.getId(), newPartitionNames);
} finally {
table.readUnlock();
}
}
return tableIdToPartitionName;
}
public Map<Long, List<String>> getTableIdToColumnName() throws AnalysisException {
Preconditions.checkArgument(isAnalyzed(),
"The db name must be obtained after the parsing is complete");
Map<Long, List<String>> tableIdToColumnName = Maps.newHashMap();
List<Table> tables = getTables();
if (optColumnNames == null || optColumnNames.isEmpty()) {
for (Table table : tables) {
table.readLock();
try {
long tblId = table.getId();
List<Column> baseSchema = table.getBaseSchema();
List<String> colNames = Lists.newArrayList();
baseSchema.stream().map(Column::getName).forEach(colNames::add);
tableIdToColumnName.put(tblId, colNames);
} finally {
table.readUnlock();
}
}
} else {
for (Long tblId : tblIds) {
tableIdToColumnName.put(tblId, optColumnNames);
}
}
return tableIdToColumnName;
}
public Map<String, String> getProperties() {
return optProperties;
}
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
// step1: analyze db, table and column
if (optTableName != null) {
optTableName.analyze(analyzer);
tableName.analyze(analyzer);
String catalogName = optTableName.getCtl();
String dbName = optTableName.getDb();
String tblName = optTableName.getTbl();
CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName);
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrAnalysisException(tblName);
String catalogName = tableName.getCtl();
String dbName = tableName.getDb();
String tblName = tableName.getTbl();
CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName);
DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
table = db.getTableOrAnalysisException(tblName);
checkAnalyzePriv(dbName, tblName);
checkAnalyzePriv(dbName, tblName);
if (optColumnNames != null && !optColumnNames.isEmpty()) {
table.readLock();
try {
List<String> baseSchema = table.getBaseSchema(false)
.stream().map(Column::getName).collect(Collectors.toList());
Optional<String> optional = optColumnNames.stream()
.filter(entity -> !baseSchema.contains(entity)).findFirst();
if (optional.isPresent()) {
String columnName = optional.get();
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
columnName, FeNameFormat.getColumnNameRegex());
}
} finally {
table.readUnlock();
}
} else {
optColumnNames = table.getBaseSchema(false)
.stream().map(Column::getName).collect(Collectors.toList());
}
dbId = db.getId();
tblIds.add(table.getId());
} else {
// analyze the current default db
String dbName = analyzer.getDefaultDb();
if (Strings.isNullOrEmpty(dbName)) {
ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR);
}
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
db.readLock();
if (optColumnNames != null && !optColumnNames.isEmpty()) {
table.readLock();
try {
List<Table> tables = db.getTables();
for (Table table : tables) {
checkAnalyzeType(table);
checkAnalyzePriv(dbName, table.getName());
}
dbId = db.getId();
for (Table table : tables) {
long tblId = table.getId();
tblIds.add(tblId);
List<String> baseSchema = table.getBaseSchema(false)
.stream().map(Column::getName).collect(Collectors.toList());
Optional<String> optional = optColumnNames.stream()
.filter(entity -> !baseSchema.contains(entity)).findFirst();
if (optional.isPresent()) {
String columnName = optional.get();
ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME,
columnName, FeNameFormat.getColumnNameRegex());
}
} finally {
db.readUnlock();
table.readUnlock();
}
} else {
optColumnNames = table.getBaseSchema(false)
.stream().map(Column::getName).collect(Collectors.toList());
}
dbId = db.getId();
// step2: analyze partition
checkPartitionNames();
// step3: analyze properties
checkProperties();
}
@ -286,18 +154,12 @@ public class AnalyzeStmt extends DdlStmt {
}
}
private void checkAnalyzeType(Table table) throws AnalysisException {
if (table.getType() != Table.TableType.OLAP) {
throw new AnalysisException("Only OLAP table statistics are supported");
}
}
private void checkPartitionNames() throws AnalysisException {
if (optPartitionNames != null) {
optPartitionNames.analyze(analyzer);
if (optTableName != null) {
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(optTableName.getDb());
OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(optTableName.getTbl());
if (tableName != null) {
Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tableName.getDb());
OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(tableName.getTbl());
if (!olapTable.isPartitioned()) {
throw new AnalysisException("Not a partitioned table: " + olapTable.getName());
}
@ -340,9 +202,9 @@ public class AnalyzeStmt extends DdlStmt {
StringBuilder sb = new StringBuilder();
sb.append("ANALYZE");
if (optTableName != null) {
if (tableName != null) {
sb.append(" ");
sb.append(optTableName.toSql());
sb.append(tableName.toSql());
}
if (optColumnNames != null) {
@ -369,18 +231,46 @@ public class AnalyzeStmt extends DdlStmt {
}
public String getCatalogName() {
return optTableName.getCtl();
return tableName.getCtl();
}
public String getDBName() {
return optTableName.getDb();
return tableName.getDb();
}
public String getTblName() {
return optTableName.getTbl();
public TableName getTblName() {
return tableName;
}
public List<String> getOptColumnNames() {
return optColumnNames;
}
public long getDbId() {
Preconditions.checkArgument(isAnalyzed(),
"The dbId must be obtained after the parsing is complete");
return dbId;
}
public Database getDb() throws AnalysisException {
Preconditions.checkArgument(isAnalyzed(),
"The db must be obtained after the parsing is complete");
return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId);
}
public TableIf getTable() {
return table;
}
public List<String> getPartitionNames() {
Preconditions.checkArgument(isAnalyzed(),
"The partitionNames must be obtained after the parsing is complete");
return partitionNames;
}
public Map<String, String> getProperties() {
return optProperties;
}
}

View File

@ -132,4 +132,7 @@ public class SelectListItem {
return expr.toColumnLabel();
}
public void setAlias(String alias) {
this.alias = alias;
}
}

View File

@ -1833,18 +1833,26 @@ public class SelectStmt extends QueryStmt {
if (selectList.isDistinct()) {
strBuilder.append("DISTINCT ");
}
for (int i = 0; i < resultExprs.size(); ++i) {
// strBuilder.append(selectList.getItems().get(i).toSql());
// strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : "");
if (i != 0) {
strBuilder.append(", ");
ConnectContext ctx = ConnectContext.get();
if (ctx == null || ctx.getSessionVariable().internalSession) {
for (int i = 0; i < selectList.getItems().size(); i++) {
strBuilder.append(selectList.getItems().get(i).toSql());
strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : "");
}
if (needToSql) {
strBuilder.append(originalExpr.get(i).toSql());
} else {
strBuilder.append(resultExprs.get(i).toSql());
} else {
for (int i = 0; i < resultExprs.size(); ++i) {
// strBuilder.append(selectList.getItems().get(i).toSql());
// strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : "");
if (i != 0) {
strBuilder.append(", ");
}
if (needToSql) {
strBuilder.append(originalExpr.get(i).toSql());
} else {
strBuilder.append(resultExprs.get(i).toSql());
}
strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i)));
}
strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i)));
}
// From clause

View File

@ -47,6 +47,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
/**
* Superclass of all table references, including references to views, base tables
@ -226,6 +227,13 @@ public class TableRef implements ParseNode, Writable {
output.append("[").append(Joiner.on(", ").join(joinHints)).append("] ");
}
output.append(tableRefToSql()).append(" ");
if (partitionNames != null) {
StringJoiner sj = new StringJoiner(",", "", " ");
for (String partName : partitionNames.getPartitionNames()) {
sj.add(partName);
}
output.append(sj.toString());
}
if (usingColNames != null) {
output.append("USING (").append(Joiner.on(", ").join(usingColNames)).append(")");
} else if (onClause != null) {
@ -864,4 +872,12 @@ public class TableRef implements ParseNode, Writable {
aliases = new String[]{alias};
}
}
public void setPartitionNames(PartitionNames partitionNames) {
this.partitionNames = partitionNames;
}
public void setName(TableName name) {
this.name = name;
}
}

View File

@ -208,12 +208,12 @@ import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.Tag;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsJobManager;
import org.apache.doris.statistics.StatisticsJobScheduler;
import org.apache.doris.statistics.StatisticsManager;
import org.apache.doris.statistics.StatisticsRepository;
import org.apache.doris.statistics.StatisticsTaskScheduler;
import org.apache.doris.system.Backend;
import org.apache.doris.system.FQDNManager;
@ -441,9 +441,7 @@ public class Env {
private MTMVJobManager mtmvJobManager;
private final AnalysisJobScheduler analysisJobScheduler;
private final StatisticsCache statisticsCache;
private AnalysisManager analysisManager;
private ExternalMetaCacheMgr extMetaCacheMgr;
@ -647,10 +645,9 @@ public class Env {
this.refreshManager = new RefreshManager();
this.policyMgr = new PolicyMgr();
this.mtmvJobManager = new MTMVJobManager();
this.analysisJobScheduler = new AnalysisJobScheduler();
this.statisticsCache = new StatisticsCache();
this.extMetaCacheMgr = new ExternalMetaCacheMgr();
this.fqdnManager = new FQDNManager(systemInfo);
this.analysisManager = new AnalysisManager();
}
public static void destroyCheckpoint() {
@ -1653,7 +1650,7 @@ public class Env {
}
public StatisticsCache getStatisticsCache() {
return statisticsCache;
return analysisManager.getStatisticsCache();
}
public boolean hasReplayer() {
@ -5233,8 +5230,8 @@ public class Env {
return count;
}
public AnalysisJobScheduler getAnalysisJobScheduler() {
return analysisJobScheduler;
public AnalysisTaskScheduler getAnalysisJobScheduler() {
return analysisManager.taskScheduler;
}
// TODO:
@ -5242,6 +5239,11 @@ public class Env {
// 2. support sample job
// 3. support period job
public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
StatisticsRepository.createAnalysisJob(analyzeStmt);
analysisManager.createAnalysisJob(analyzeStmt);
}
public AnalysisManager getAnalysisManager() {
return analysisManager;
}
}

View File

@ -44,24 +44,22 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
public class InternalSchemaInitializer extends Thread {
private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class);
public static boolean forTest = false;
/**
* If internal table creation failed, will retry after below seconds.
*/
public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1;
public void run() {
if (forTest) {
if (FeConstants.runningUnitTest) {
return;
}
while (true) {
while (!created()) {
FrontendNodeType feType = Env.getCurrentEnv().getFeType();
if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) {
LOG.warn("FE is not ready");
@ -72,7 +70,6 @@ public class InternalSchemaInitializer extends Thread {
.join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L);
createDB();
createTbl();
break;
} catch (Throwable e) {
LOG.warn("Statistics storage initiated failed, will try again later", e);
}
@ -145,10 +142,12 @@ public class InternalSchemaInitializer extends Thread {
FeConstants.INTERNAL_DB_NAME, StatisticConstants.ANALYSIS_JOB_TABLE);
List<ColumnDef> columnDefs = new ArrayList<>();
columnDefs.add(new ColumnDef("job_id", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("task_id", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("catalog_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("db_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024)));
columnDefs.add(new ColumnDef("index_id", TypeDef.create(PrimitiveType.BIGINT)));
columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("analysis_type", TypeDef.createVarchar(32)));
columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024)));
@ -175,4 +174,16 @@ public class InternalSchemaInitializer extends Thread {
return createTableStmt;
}
private boolean created() {
Optional<Database> optionalDatabase =
Env.getCurrentEnv().getInternalCatalog()
.getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME);
if (!optionalDatabase.isPresent()) {
return false;
}
Database db = optionalDatabase.get();
return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent()
&& db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent();
}
}

View File

@ -47,9 +47,12 @@ import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.MVAnalysisTask;
import org.apache.doris.statistics.OlapAnalysisTask;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TCompressionType;
@ -997,8 +1000,11 @@ public class OlapTable extends Table {
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
return new AnalysisJob(scheduler, info);
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
if (info.analysisType.equals(AnalysisType.COLUMN)) {
return new OlapAnalysisTask(scheduler, info);
}
return new MVAnalysisTask(scheduler, info);
}
@Override
@ -1939,4 +1945,5 @@ public class OlapTable extends Table {
public Set<Long> getPartitionKeys() {
return idToPartition.keySet();
}
}

View File

@ -26,9 +26,9 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.base.Preconditions;
@ -508,7 +508,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
throw new NotImplementedException();
}

View File

@ -20,9 +20,9 @@ package org.apache.doris.catalog;
import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
import java.util.Collections;
@ -111,7 +111,7 @@ public interface TableIf {
TTableDescriptor toThrift();
AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info);
BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info);
/**
* Doris table type.

View File

@ -29,9 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.gson.annotations.SerializedName;
@ -301,7 +301,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
throw new NotImplementedException();
}

View File

@ -22,11 +22,11 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisJob;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisJobScheduler;
import org.apache.doris.statistics.HiveAnalysisJob;
import org.apache.doris.statistics.IcebergAnalysisJob;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HiveAnalysisTask;
import org.apache.doris.statistics.IcebergAnalysisTask;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@ -268,13 +268,13 @@ public class HMSExternalTable extends ExternalTable {
}
@Override
public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
makeSureInitialized();
switch (dlaType) {
case HIVE:
return new HiveAnalysisJob(scheduler, info);
return new HiveAnalysisTask(scheduler, info);
case ICEBERG:
return new IcebergAnalysisJob(scheduler, info);
return new IcebergAnalysisTask(scheduler, info);
default:
throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported.");
}

View File

@ -1791,12 +1791,13 @@ public class StmtExecutor implements ProfileWriter {
public List<ResultRow> executeInternalQuery() {
try {
List<ResultRow> resultRows = new ArrayList<>();
analyzer = new Analyzer(context.getEnv(), context);
try {
analyze(context.getSessionVariable().toThrift());
} catch (UserException e) {
LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e);
return null;
return resultRows;
}
planner.getFragments();
RowBatch batch;
@ -1821,7 +1822,6 @@ public class StmtExecutor implements ProfileWriter {
}
Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result")
.setParent(Context.current()).startSpan();
List<ResultRow> resultRows = new ArrayList<>();
try (Scope scope = fetchResultSpan.makeCurrent()) {
while (true) {
batch = coord.getNext();
@ -1834,7 +1834,7 @@ public class StmtExecutor implements ProfileWriter {
} catch (Exception e) {
LOG.warn("Unexpected exception when SQL running", e);
fetchResultSpan.recordException(e);
return null;
return resultRows;
} finally {
fetchResultSpan.end();
}

View File

@ -1,207 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.text.StringSubstitutor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class AnalysisJob {
private final AnalysisJobScheduler analysisJobScheduler;
protected final AnalysisJobInfo info;
protected CatalogIf catalog;
protected DatabaseIf db;
protected TableIf tbl;
protected Column col;
protected StmtExecutor stmtExecutor;
public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
this.analysisJobScheduler = analysisJobScheduler;
this.info = info;
init(info);
}
private void init(AnalysisJobInfo info) {
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
if (catalog == null) {
analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis());
return;
}
db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
if (db == null) {
analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED,
String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis());
return;
}
tbl = (TableIf) db.getTable(info.tblName).orElse(null);
if (tbl == null) {
analysisJobScheduler.updateJobStatus(
info.jobId, JobState.FAILED,
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
}
col = tbl.getColumn(info.colName);
if (col == null) {
analysisJobScheduler.updateJobStatus(
info.jobId, JobState.FAILED, String.format("Column with name %s not exists", info.tblName),
System.currentTimeMillis());
}
}
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT "
+ "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "'${colId}' AS col_id, "
+ "${partId} AS part_id, "
+ "COUNT(1) AS row_count, "
+ "NDV(${colName}) AS ndv, "
+ "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS null_count, "
+ "MIN(${colName}) AS min, "
+ "MAX(${colName}) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW()"
+ "FROM `${dbName}`.`${tblName}` "
+ "PARTITION ${partName}";
private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, "
+ " ndv, null_count, min, max, data_size, update_time\n"
+ " FROM \n"
+ " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
+ " ${catalogId} AS catalog_id, "
+ " ${dbId} AS db_id, "
+ " ${tblId} AS tbl_id, "
+ " '${colId}' AS col_id, "
+ " NULL AS part_id, "
+ " SUM(count) AS row_count, \n"
+ " SUM(null_count) AS null_count, "
+ " MIN(CAST(min AS ${type})) AS min, "
+ " MAX(CAST(max AS ${type})) AS max, "
+ " SUM(data_size_in_bytes) AS data_size, "
+ " NOW() AS update_time\n"
+ " FROM ${internalDB}.${columnStatTbl}"
+ " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND "
+ " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
+ " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
+ " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
+ " ) t1, \n"
+ " (SELECT NDV(${colName}) AS ndv FROM `${dbName}`.`${tblName}`) t2\n";
private String getDataSizeFunction() {
if (col.getType().isStringType()) {
return "SUM(LENGTH(${colName}))";
}
return "COUNT(1) * " + col.getType().getSlotSize();
}
public void execute() throws Exception {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("colId", String.valueOf(info.colName));
params.put("dataSizeFunction", getDataSizeFunction());
params.put("dbName", info.dbName);
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = ((Table) tbl).getPartitionNames();
for (String partName : partNames) {
Partition part = ((Table) tbl).getPartition(partName);
if (part == null) {
continue;
}
params.put("partId", String.valueOf(((Table) tbl).getPartition(partName).getId()));
params.put("partName", String.valueOf(partName));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
}
for (String sql : partitionAnalysisSQLs) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
this.stmtExecutor.execute();
}
}
params.remove("partId");
params.put("type", col.getType().toString());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
this.stmtExecutor.execute();
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName());
}
}
public int getLastExecTime() {
return info.lastExecTimeInMs;
}
public void cancel() {
if (stmtExecutor != null) {
stmtExecutor.cancel();
}
analysisJobScheduler
.updateJobStatus(info.jobId, JobState.FAILED,
String.format("Job has been cancelled: %s", info.toString()), -1);
}
public void updateState(JobState jobState) {
info.updateState(jobState);
}
public long getJobId() {
return info.jobId;
}
}

View File

@ -0,0 +1,145 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.FeConstants;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class AnalysisManager {
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);
private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}";
private final ConcurrentMap<Long, Map<Long, AnalysisTaskInfo>> analysisJobIdToTaskMap;
public final AnalysisTaskScheduler taskScheduler;
private StatisticsCache statisticsCache;
private final AnalysisTaskExecutor taskExecutor;
public AnalysisManager() {
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
this.taskScheduler = new AnalysisTaskScheduler();
taskExecutor = new AnalysisTaskExecutor(taskScheduler);
this.statisticsCache = new StatisticsCache();
taskExecutor.start();
}
public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
String catalogName = analyzeStmt.getCatalogName();
String db = analyzeStmt.getDBName();
TableName tbl = analyzeStmt.getTblName();
StatisticsUtil.convertTableNameToObjects(tbl);
List<String> colNames = analyzeStmt.getOptColumnNames();
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
long jobId = Env.getCurrentEnv().getNextId();
if (colNames != null) {
for (String colName : colNames) {
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId)
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setColName(colName).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.COLUMN)
.setScheduleType(ScheduleType.ONCE).build();
try {
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new RuntimeException("Failed to create analysis job", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
}
if (analyzeStmt.wholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP)) {
OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
try {
olapTable.readLock();
for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) {
if (meta.getDefineStmt() == null) {
continue;
}
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(
jobId).setTaskId(taskId)
.setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
.setScheduleType(ScheduleType.ONCE).build();
try {
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new RuntimeException("Failed to create analysis job", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
} finally {
olapTable.readUnlock();
}
}
analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {
Map<String, String> params = new HashMap<>();
params.put("jobState", jobState.toString());
params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : "");
params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time);
params.put("jobId", String.valueOf(info.jobId));
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e);
} finally {
info.state = jobState;
if (analysisJobIdToTaskMap.get(info.jobId).values()
.stream().allMatch(i -> i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) {
analysisJobIdToTaskMap.remove(info.jobId);
}
}
}
public StatisticsCache getStatisticsCache() {
return statisticsCache;
}
}

View File

@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
public enum AnalysisState {
PENDING,
RUNNING,
FINISHED,
FAILED;
}

View File

@ -17,10 +17,10 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.doris.statistics.util.BlockingCounter;
import org.apache.logging.log4j.LogManager;
@ -33,9 +33,9 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AnalysisJobExecutor extends Thread {
public class AnalysisTaskExecutor extends Thread {
private static final Logger LOG = LogManager.getLogger(AnalysisJobExecutor.class);
private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool(
Config.statistics_simultaneously_running_job_num,
@ -44,17 +44,17 @@ public class AnalysisJobExecutor extends Thread {
new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE),
"Analysis Job Executor", true);
private final AnalysisJobScheduler jobScheduler;
private final AnalysisTaskScheduler taskScheduler;
private final BlockingCounter blockingCounter =
new BlockingCounter(Config.statistics_simultaneously_running_job_num);
private final BlockingQueue<AnalysisJobWrapper> jobQueue =
new PriorityBlockingQueue<AnalysisJobWrapper>(20,
Comparator.comparingLong(AnalysisJobWrapper::getStartTime));
private final BlockingQueue<AnalysisTaskWrapper> jobQueue =
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));
public AnalysisJobExecutor(AnalysisJobScheduler jobExecutor) {
this.jobScheduler = jobExecutor;
public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) {
this.taskScheduler = jobExecutor;
}
@Override
@ -73,12 +73,12 @@ public class AnalysisJobExecutor extends Thread {
private void doCancelExpiredJob() {
for (;;) {
try {
AnalysisJobWrapper jobWrapper = jobQueue.take();
AnalysisTaskWrapper taskWrapper = jobQueue.take();
try {
long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS;
jobWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
jobWrapper.cancel();
taskWrapper.cancel();
}
} catch (Throwable throwable) {
LOG.warn(throwable);
@ -101,11 +101,13 @@ public class AnalysisJobExecutor extends Thread {
}
private void doFetchAndExecute() {
AnalysisJob job = jobScheduler.getPendingJobs();
AnalysisJobWrapper jobWrapper = new AnalysisJobWrapper(this, job);
BaseAnalysisTask task = taskScheduler.getPendingTasks();
AnalysisTaskWrapper jobWrapper = new AnalysisTaskWrapper(this, task);
incr();
jobScheduler.updateJobStatus(job.getJobId(), JobState.RUNNING, "", -1);
executors.submit(jobWrapper);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.RUNNING, "", System.currentTimeMillis());
}
public void decr() {
@ -116,7 +118,7 @@ public class AnalysisJobExecutor extends Thread {
blockingCounter.incr();
}
public void putJob(AnalysisJobWrapper wrapper) throws Exception {
public void putJob(AnalysisTaskWrapper wrapper) throws Exception {
jobQueue.put(wrapper);
}
}

View File

@ -20,23 +20,21 @@ package org.apache.doris.statistics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Objects;
import java.util.StringJoiner;
public class AnalysisJobInfo {
public class AnalysisTaskInfo {
private static final Logger LOG = LogManager.getLogger(AnalysisJobInfo.class);
private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class);
public enum JobState {
PENDING,
RUNNING,
FINISHED,
FAILED;
public enum AnalysisMethod {
SAMPLE,
FULL
}
public enum AnalysisType {
SAMPLE,
FULL;
COLUMN,
INDEX
}
public enum JobType {
@ -53,6 +51,8 @@ public class AnalysisJobInfo {
public final long jobId;
public final long taskId;
public final String catalogName;
public final String dbName;
@ -61,56 +61,43 @@ public class AnalysisJobInfo {
public final String colName;
public final Long indexId;
public final JobType jobType;
public AnalysisType analysisType;
public final AnalysisMethod analysisMethod;
public final AnalysisType analysisType;
public String message;
// finished or failed
public int lastExecTimeInMs = 0;
private JobState state;
public AnalysisState state;
public final ScheduleType scheduleType;
public AnalysisJobInfo(long jobId, String catalogName, String dbName, String tblName, String colName,
JobType jobType, ScheduleType scheduleType) {
public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName,
String colName, Long indexId, JobType jobType,
AnalysisMethod analysisMethod, AnalysisType analysisType, String message, int lastExecTimeInMs,
AnalysisState state, ScheduleType scheduleType) {
this.jobId = jobId;
this.taskId = taskId;
this.catalogName = catalogName;
this.dbName = dbName;
this.tblName = tblName;
this.colName = colName;
this.indexId = indexId;
this.jobType = jobType;
this.analysisMethod = analysisMethod;
this.analysisType = analysisType;
this.message = message;
this.lastExecTimeInMs = lastExecTimeInMs;
this.state = state;
this.scheduleType = scheduleType;
}
@Override
public int hashCode() {
return Objects.hash(catalogName, dbName, tblName, colName, analysisType);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
AnalysisJobInfo other = (AnalysisJobInfo) obj;
return catalogName.equals(other.catalogName)
&& dbName.equals(other.dbName)
&& tblName.equals(other.tblName)
&& colName.equals(other.colName)
&& analysisType.equals(other.analysisType);
}
// TODO: log to meta
public void updateState(JobState jobState) {
this.state = jobState;
}
@Override
public String toString() {
StringJoiner sj = new StringJoiner("\n", getClass().getName() + ":\n", "\n");
@ -119,14 +106,15 @@ public class AnalysisJobInfo {
sj.add("DBName: " + dbName);
sj.add("TableName: " + tblName);
sj.add("ColumnName: " + colName);
sj.add("JobType: " + analysisType.toString());
sj.add("TaskType: " + analysisType.toString());
sj.add("TaskMethod: " + analysisMethod.toString());
sj.add("Message: " + message);
sj.add("LastExecTime: " + String.valueOf(lastExecTimeInMs));
sj.add("CurrentState: " + state.toString());
return sj.toString();
}
public JobState getState() {
public AnalysisState getState() {
return state;
}
}

View File

@ -0,0 +1,115 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType;
public class AnalysisTaskInfoBuilder {
private long jobId;
private long taskId;
private String catalogName;
private String dbName;
private String tblName;
private String colName;
private Long indexId;
private JobType jobType;
private AnalysisMethod analysisMethod;
private AnalysisType analysisType;
private String message;
private int lastExecTimeInMs;
private AnalysisState state;
private ScheduleType scheduleType;
public AnalysisTaskInfoBuilder setJobId(long jobId) {
this.jobId = jobId;
return this;
}
public AnalysisTaskInfoBuilder setTaskId(long taskId) {
this.taskId = taskId;
return this;
}
public AnalysisTaskInfoBuilder setCatalogName(String catalogName) {
this.catalogName = catalogName;
return this;
}
public AnalysisTaskInfoBuilder setDbName(String dbName) {
this.dbName = dbName;
return this;
}
public AnalysisTaskInfoBuilder setTblName(String tblName) {
this.tblName = tblName;
return this;
}
public AnalysisTaskInfoBuilder setColName(String colName) {
this.colName = colName;
return this;
}
public AnalysisTaskInfoBuilder setIndexId(Long indexId) {
this.indexId = indexId;
return this;
}
public AnalysisTaskInfoBuilder setJobType(JobType jobType) {
this.jobType = jobType;
return this;
}
public AnalysisTaskInfoBuilder setAnalysisMethod(AnalysisMethod analysisMethod) {
this.analysisMethod = analysisMethod;
return this;
}
public AnalysisTaskInfoBuilder setAnalysisType(AnalysisType analysisType) {
this.analysisType = analysisType;
return this;
}
public AnalysisTaskInfoBuilder setMessage(String message) {
this.message = message;
return this;
}
public AnalysisTaskInfoBuilder setLastExecTimeInMs(int lastExecTimeInMs) {
this.lastExecTimeInMs = lastExecTimeInMs;
return this;
}
public AnalysisTaskInfoBuilder setState(AnalysisState state) {
this.state = state;
return this;
}
public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) {
this.scheduleType = scheduleType;
return this;
}
public AnalysisTaskInfo build() {
return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colName, indexId, jobType,
analysisMethod, analysisType, message, lastExecTimeInMs, state, scheduleType);
}
}

View File

@ -20,94 +20,63 @@ package org.apache.doris.statistics;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.FeConstants;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
public class AnalysisJobScheduler {
public class AnalysisTaskScheduler {
private static final Logger LOG = LogManager.getLogger(AnalysisJobScheduler.class);
private static final Logger LOG = LogManager.getLogger(AnalysisTaskScheduler.class);
private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}";
private final PriorityQueue<BaseAnalysisTask> systemJobQueue =
new PriorityQueue<>(Comparator.comparingInt(BaseAnalysisTask::getLastExecTime));
private final PriorityQueue<AnalysisJob> systemJobQueue =
new PriorityQueue<AnalysisJob>(Comparator.comparingInt(AnalysisJob::getLastExecTime));
private final Queue<BaseAnalysisTask> manualJobQueue = new LinkedList<>();
private final Queue<AnalysisJob> manualJobQueue = new LinkedList<>();
private final Set<BaseAnalysisTask> systemJobSet = new HashSet<>();
private final Set<AnalysisJob> systemJobSet = new HashSet<>();
private final Set<BaseAnalysisTask> manualJobSet = new HashSet<>();
private final Set<AnalysisJob> manualJobSet = new HashSet<>();
private final AnalysisJobExecutor jobExecutor = new AnalysisJobExecutor(this);
{
jobExecutor.start();
}
public void updateJobStatus(long jobId, JobState jobState, String message, long time) {
Map<String, String> params = new HashMap<>();
params.put("jobState", jobState.toString());
params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : "");
params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time);
params.put("jobId", String.valueOf(jobId));
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn(String.format("Failed to update state for job: %s", jobId), e);
}
}
public synchronized void scheduleJobs(List<AnalysisJobInfo> analysisJobInfos) {
for (AnalysisJobInfo job : analysisJobInfos) {
public synchronized void scheduleJobs(List<AnalysisTaskInfo> analysisJobInfos) {
for (AnalysisTaskInfo job : analysisJobInfos) {
schedule(job);
}
}
public synchronized void schedule(AnalysisJobInfo analysisJobInfo) {
public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName);
Preconditions.checkArgument(catalog != null);
DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName);
Preconditions.checkArgument(db != null);
TableIf table = db.getTableNullable(analysisJobInfo.tblName);
Preconditions.checkArgument(table != null);
AnalysisJob analysisJob = table.createAnalysisJob(this, analysisJobInfo);
addToManualJobQueue(analysisJob);
BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo);
addToManualJobQueue(analysisTask);
if (analysisJobInfo.jobType.equals(JobType.MANUAL)) {
return;
}
addToSystemQueue(analysisJob);
addToSystemQueue(analysisTask);
}
private void removeFromSystemQueue(AnalysisJob analysisJobInfo) {
private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) {
if (manualJobSet.contains(analysisJobInfo)) {
systemJobQueue.remove(analysisJobInfo);
manualJobSet.remove(analysisJobInfo);
}
}
private void addToSystemQueue(AnalysisJob analysisJobInfo) {
private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) {
if (systemJobSet.contains(analysisJobInfo)) {
return;
}
@ -116,7 +85,7 @@ public class AnalysisJobScheduler {
notify();
}
private void addToManualJobQueue(AnalysisJob analysisJobInfo) {
private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) {
if (manualJobSet.contains(analysisJobInfo)) {
return;
}
@ -125,7 +94,7 @@ public class AnalysisJobScheduler {
notify();
}
public synchronized AnalysisJob getPendingJobs() {
public synchronized BaseAnalysisTask getPendingTasks() {
while (true) {
if (!manualJobQueue.isEmpty()) {
return manualJobQueue.poll();

View File

@ -18,60 +18,66 @@
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.FutureTask;
public class AnalysisJobWrapper extends FutureTask<Void> {
public class AnalysisTaskWrapper extends FutureTask<Void> {
private static final Logger LOG = LogManager.getLogger(AnalysisJobWrapper.class);
private static final Logger LOG = LogManager.getLogger(AnalysisTaskWrapper.class);
private final AnalysisJob job;
private final BaseAnalysisTask task;
private long startTime;
private final AnalysisJobExecutor executor;
private final AnalysisTaskExecutor executor;
public AnalysisJobWrapper(AnalysisJobExecutor executor, AnalysisJob job) {
public AnalysisTaskWrapper(AnalysisTaskExecutor executor, BaseAnalysisTask job) {
super(() -> {
job.execute();
return null;
});
this.executor = executor;
this.job = job;
this.task = job;
}
@Override
public void run() {
startTime = System.currentTimeMillis();
Exception except = null;
Throwable except = null;
try {
executor.putJob(this);
super.run();
Object result = get();
if (result instanceof Throwable) {
except = (Throwable) result;
}
} catch (Exception e) {
except = e;
} finally {
executor.decr();
if (except != null) {
Env.getCurrentEnv().getAnalysisJobScheduler()
.updateJobStatus(job.getJobId(), JobState.FAILED, except.getMessage(), -1);
LOG.warn("Failed to execute task", except);
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FAILED, except.getMessage(), -1);
} else {
Env.getCurrentEnv().getAnalysisJobScheduler()
.updateJobStatus(job.getJobId(), JobState.FINISHED, "", System.currentTimeMillis());
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(task.info,
AnalysisState.FINISHED, "", System.currentTimeMillis());
}
LOG.warn("{} finished, cost time:{}", job.toString(), System.currentTimeMillis() - startTime);
LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
}
}
public boolean cancel() {
try {
LOG.warn("{} cancelled, cost time:{}", job.toString(), System.currentTimeMillis() - startTime);
job.cancel();
LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime);
task.cancel();
} catch (Exception e) {
LOG.warn(String.format("Cancel job failed job info : %s", job.toString()));
LOG.warn(String.format("Cancel job failed job info : %s", task.toString()));
} finally {
executor.decr();
}

View File

@ -0,0 +1,160 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import com.google.common.annotations.VisibleForTesting;
public abstract class BaseAnalysisTask {
protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT "
+ "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, "
+ "${catalogId} AS catalog_id, "
+ "${dbId} AS db_id, "
+ "${tblId} AS tbl_id, "
+ "'${colId}' AS col_id, "
+ "${partId} AS part_id, "
+ "COUNT(1) AS row_count, "
+ "NDV(`${colName}`) AS ndv, "
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
+ "MIN(`${colName}`) AS min, "
+ "MAX(`${colName}`) AS max, "
+ "${dataSizeFunction} AS data_size, "
+ "NOW() ";
protected static final String INSERT_COL_STATISTICS = "INSERT INTO "
+ "${internalDB}.${columnStatTbl}"
+ " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, "
+ " ndv, null_count, min, max, data_size, update_time\n"
+ " FROM \n"
+ " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, "
+ " ${catalogId} AS catalog_id, "
+ " ${dbId} AS db_id, "
+ " ${tblId} AS tbl_id, "
+ " '${colId}' AS col_id, "
+ " NULL AS part_id, "
+ " SUM(count) AS row_count, \n"
+ " SUM(null_count) AS null_count, "
+ " MIN(CAST(min AS ${type})) AS min, "
+ " MAX(CAST(max AS ${type})) AS max, "
+ " SUM(data_size_in_bytes) AS data_size, "
+ " NOW() AS update_time\n"
+ " FROM ${internalDB}.${columnStatTbl}"
+ " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND "
+ " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
+ " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
+ " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
+ " ) t1, \n";
protected AnalysisTaskScheduler analysisTaskScheduler;
protected AnalysisTaskInfo info;
protected CatalogIf catalog;
protected DatabaseIf db;
protected TableIf tbl;
protected Column col;
protected StmtExecutor stmtExecutor;
protected AnalysisState analysisState;
@VisibleForTesting
public BaseAnalysisTask() {
}
public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
this.analysisTaskScheduler = analysisTaskScheduler;
this.info = info;
init(info);
}
private void init(AnalysisTaskInfo info) {
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
if (catalog == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis());
return;
}
db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null);
if (db == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis());
return;
}
tbl = (TableIf) db.getTable(info.tblName).orElse(null);
if (tbl == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
info, AnalysisState.FAILED,
String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis());
}
if (info.analysisType != null && info.analysisType.equals(AnalysisType.COLUMN)) {
col = tbl.getColumn(info.colName);
if (col == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(
info, AnalysisState.FAILED, String.format("Column with name %s not exists", info.tblName),
System.currentTimeMillis());
}
}
}
public abstract void execute() throws Exception;
public void cancel() {
if (stmtExecutor != null) {
stmtExecutor.cancel();
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
String.format("Job has been cancelled: %s", info.toString()), -1);
}
public int getLastExecTime() {
return info.lastExecTimeInMs;
}
public long getJobId() {
return info.jobId;
}
public AnalysisState getAnalysisState() {
return analysisState;
}
protected String getDataSizeFunction(Column column) {
if (column.getType().isStringType()) {
return "SUM(LENGTH(`${colName}`))";
}
return "COUNT(1) * " + column.getType().getSlotSize();
}
}

View File

@ -22,12 +22,12 @@ import org.apache.doris.common.Config;
import org.apache.commons.lang.NotImplementedException;
public class HMSAnalysisJob extends AnalysisJob {
public class HMSAnalysisTask extends BaseAnalysisTask {
protected HMSExternalTable table;
public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
public HMSAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
table = (HMSExternalTable) tbl;
}

View File

@ -43,16 +43,16 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HiveAnalysisJob extends HMSAnalysisJob {
private static final Logger LOG = LogManager.getLogger(HiveAnalysisJob.class);
public class HiveAnalysisTask extends HMSAnalysisTask {
private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class);
public static final String TOTAL_SIZE = "totalSize";
public static final String NUM_ROWS = "numRows";
public static final String NUM_FILES = "numFiles";
public static final String TIMESTAMP = "transient_lastDdlTime";
public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
public HiveAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
}
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "

View File

@ -38,14 +38,14 @@ import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class IcebergAnalysisJob extends HMSAnalysisJob {
public class IcebergAnalysisTask extends HMSAnalysisTask {
private long numRows = 0;
private long dataSize = 0;
private long numNulls = 0;
public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) {
super(analysisJobScheduler, info);
public IcebergAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
}
private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "

View File

@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionCallExpr;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.SelectListItem;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndexMeta;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.base.Preconditions;
import java.io.StringReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
/**
* Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which
* columns to be analyzed.
* TODO: Supports multi-table mv
*/
public class MVAnalysisTask extends BaseAnalysisTask {
private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS
+ " FROM (${sql}) mv";
private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS
+ " (SELECT NDV(`${colName}`) AS ndv "
+ " FROM (${sql}) mv) t2\n";
private MaterializedIndexMeta meta;
private SelectStmt selectStmt;
private OlapTable olapTable;
public MVAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
init();
}
private void init() {
olapTable = (OlapTable) tbl;
meta = olapTable.getIndexMetaByIndexId(info.indexId);
Preconditions.checkState(meta != null);
String mvDef = meta.getDefineStmt().originStmt;
SqlScanner input =
new SqlScanner(new StringReader(mvDef), 0L);
SqlParser parser = new SqlParser(input);
CreateMaterializedViewStmt cmv = null;
try {
cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0);
} catch (Exception e) {
throw new RuntimeException(e);
}
selectStmt = cmv.getSelectStmt();
selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName());
}
@Override
public void execute() throws Exception {
for (Column column : meta.getSchema()) {
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
TableRef tableRef = selectOne.getTableRefs().get(0);
SelectListItem selectItem = selectOne.getSelectList().getItems()
.stream()
.filter(i -> isCorrespondingToColumn(i, column))
.findFirst()
.get();
selectItem.setAlias(column.getName());
Map<String, String> params = new HashMap<>();
for (Partition part : olapTable.getAllPartitions()) {
String partName = part.getName();
PartitionNames partitionName = new PartitionNames(false, Arrays.asList(partName));
tableRef.setPartitionNames(partitionName);
String sql = selectOne.toSql();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(meta.getIndexId()));
String colName = column.getName();
params.put("colId", colName);
long partId = part.getId();
params.put("partId", String.valueOf(partId));
params.put("dataSizeFunction", getDataSizeFunction(column));
params.put("dbName", info.dbName);
params.put("colName", colName);
params.put("tblName", String.valueOf(info.tblName));
params.put("sql", sql);
StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
}
params.remove("partId");
params.put("type", column.getType().toString());
StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
Env.getCurrentEnv().getStatisticsCache().refreshSync(meta.getIndexId(), column.getName());
}
}
// Based on the fact that materialized view create statement's select expr only contains basic SlotRef and
// AggregateFunction.
private boolean isCorrespondingToColumn(SelectListItem item, Column column) {
Expr expr = item.getExpr();
if (expr instanceof SlotRef) {
SlotRef slotRef = (SlotRef) expr;
return slotRef.getColumnName().equalsIgnoreCase(column.getName());
}
if (expr instanceof FunctionCallExpr) {
FunctionCallExpr func = (FunctionCallExpr) expr;
SlotRef slotRef = (SlotRef) func.getChild(0);
return slotRef.getColumnName().equalsIgnoreCase(column.getName());
}
return false;
}
}

View File

@ -0,0 +1,110 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.util.StatisticsUtil;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.text.StringSubstitutor;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Each task analyze one column.
*/
public class OlapAnalysisTask extends BaseAnalysisTask {
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = INSERT_PART_STATISTICS
+ "FROM `${dbName}`.`${tblName}` "
+ "PARTITION ${partName}";
private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS
+ " (SELECT NDV(`${colName}`) AS ndv "
+ " FROM `${dbName}`.`${tblName}`) t2\n";
@VisibleForTesting
public OlapAnalysisTask() {
super();
}
public OlapAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
}
public void execute() throws Exception {
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
params.put("catalogId", String.valueOf(catalog.getId()));
params.put("dbId", String.valueOf(db.getId()));
params.put("tblId", String.valueOf(tbl.getId()));
params.put("colId", String.valueOf(info.colName));
params.put("dataSizeFunction", getDataSizeFunction(col));
params.put("dbName", info.dbName);
params.put("colName", String.valueOf(info.colName));
params.put("tblName", String.valueOf(info.tblName));
List<String> partitionAnalysisSQLs = new ArrayList<>();
try {
tbl.readLock();
Set<String> partNames = tbl.getPartitionNames();
for (String partName : partNames) {
Partition part = tbl.getPartition(partName);
if (part == null) {
continue;
}
params.put("partId", String.valueOf(tbl.getPartition(partName).getId()));
params.put("partName", String.valueOf(partName));
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE));
}
} finally {
tbl.readUnlock();
}
execSQLs(partitionAnalysisSQLs);
params.remove("partId");
params.put("type", col.getType().toString());
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName());
}
@VisibleForTesting
public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
for (String sql : partitionAnalysisSQLs) {
execSQL(sql);
}
}
@VisibleForTesting
public void execSQL(String sql) throws Exception {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
this.stmtExecutor.execute();
}
}
}

View File

@ -17,7 +17,6 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
@ -260,21 +259,6 @@ public class StatisticsJob {
}
}
/**
* get statisticsJob from analyzeStmt.
* AnalyzeStmt: analyze t1(c1,c2,c3)
* tableId: [t1]
* tableIdToColumnName: {t1: [c1,c2,c3]}
*/
public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws AnalysisException {
long dbId = stmt.getDbId();
Set<Long> tblIds = stmt.getTblIds();
Map<Long, List<String>> tableIdToPartitionName = stmt.getTableIdToPartitionName();
Map<Long, List<String>> tableIdToColumnName = stmt.getTableIdToColumnName();
Map<String, String> properties = stmt.getProperties();
return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, tableIdToColumnName, properties);
}
public List<Comparable> getShowInfo(@Nullable Long tableId) throws AnalysisException {
List<Comparable> result = Lists.newArrayList();

View File

@ -17,20 +17,16 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.ShowAnalyzeStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -61,37 +57,6 @@ public class StatisticsJobManager {
return idToStatisticsJob;
}
public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException {
// The current statistics are only used for CBO test,
// and are not available to users. (work in progress)
// TODO(wzt): Further tests are needed
boolean enableCboStatistics = ConnectContext.get()
.getSessionVariable().getEnableCboStatistics();
if (enableCboStatistics) {
// step1: init statistics job by analyzeStmt
StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt);
synchronized (this) {
// step2: check restrict
checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds());
// step3: create it
createStatisticsJob(statisticsJob);
}
} else {
throw new UserException("Statistics are not yet stable, if you want to enable statistics,"
+ " use 'set enable_cbo_statistics=true' to enable it.");
}
}
public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException {
idToStatisticsJob.put(statisticsJob.getId(), statisticsJob);
try {
Env.getCurrentEnv().getStatisticsJobScheduler().addPendingJob(statisticsJob);
} catch (IllegalStateException e) {
LOG.info("The pending statistics job is full. Please submit it again later.");
throw new DdlException("The pending statistics job is full, Please submit it again later.");
}
}
/**
* The statistical job has the following restrict:
* - Rule1: The same table cannot have two unfinished statistics jobs

View File

@ -18,16 +18,12 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AlterColumnStatsStmt;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.TableName;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.statistics.AnalysisJobInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisJobInfo.JobState;
import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
@ -71,8 +67,9 @@ public class StatisticsRepository {
+ " WHERE `id` IN (${idList})";
private static final String PERSIST_ANALYSIS_JOB_SQL_TEMPLATE = "INSERT INTO "
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, '${catalogName}', '${dbName}',"
+ "'${tblName}','${colName}', '${jobType}', '${analysisType}', '${message}', '${lastExecTimeInMs}',"
+ FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, ${taskId}, '${catalogName}', '${dbName}',"
+ "'${tblName}','${colName}', ,'${indexId}','${jobType}', '${analysisType}', "
+ "'${message}', '${lastExecTimeInMs}',"
+ "'${state}', '${scheduleType}')";
private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO "
@ -134,39 +131,23 @@ public class StatisticsRepository {
return stringJoiner.toString();
}
public static void createAnalysisJob(AnalyzeStmt analyzeStmt) {
String catalogName = analyzeStmt.getCatalogName();
String db = analyzeStmt.getDBName();
String tbl = analyzeStmt.getTblName();
List<String> colNames = analyzeStmt.getOptColumnNames();
if (colNames != null) {
for (String colName : colNames) {
AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db,
tbl, colName, AnalysisJobInfo.JobType.MANUAL, ScheduleType.ONCE);
analysisJobInfo.analysisType = AnalysisType.FULL;
Map<String, String> params = new HashMap<>();
params.put("jobId", String.valueOf(analysisJobInfo.jobId));
params.put("catalogName", analysisJobInfo.catalogName);
params.put("dbName", analysisJobInfo.dbName);
params.put("tblName", analysisJobInfo.tblName);
params.put("colName", analysisJobInfo.colName);
params.put("jobType", analysisJobInfo.jobType.toString());
params.put("analysisType", analysisJobInfo.analysisType.toString());
params.put("message", "");
params.put("lastExecTimeInMs", "0");
params.put("state", JobState.PENDING.toString());
params.put("scheduleType", analysisJobInfo.scheduleType.toString());
try {
StatisticsUtil.execUpdate(
new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn("Failed to persite job for column: {}", colName, e);
return;
}
Env.getCurrentEnv().getAnalysisJobScheduler().schedule(analysisJobInfo);
}
}
public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception {
Map<String, String> params = new HashMap<>();
params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
params.put("taskId", String.valueOf(analysisTaskInfo.taskId));
params.put("catalogName", analysisTaskInfo.catalogName);
params.put("dbName", analysisTaskInfo.dbName);
params.put("tblName", analysisTaskInfo.tblName);
params.put("colName", analysisTaskInfo.colName);
params.put("indexId", String.valueOf(analysisTaskInfo.indexId));
params.put("jobType", analysisTaskInfo.jobType.toString());
params.put("analysisType", analysisTaskInfo.analysisMethod.toString());
params.put("message", "");
params.put("lastExecTimeInMs", "0");
params.put("state", AnalysisState.PENDING.toString());
params.put("scheduleType", analysisTaskInfo.scheduleType.toString());
StatisticsUtil.execUpdate(
new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE));
}
public static void alterColumnStatistics(AlterColumnStatsStmt alterColumnStatsStmt) throws Exception {

View File

@ -17,7 +17,6 @@
package org.apache.doris.statistics.util;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BoolLiteral;
import org.apache.doris.analysis.DateLiteral;
@ -46,7 +45,7 @@ import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisJobInfo;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.StatisticConstants;
import org.apache.doris.statistics.util.InternalQueryResult.ResultRow;
@ -90,13 +89,11 @@ public class StatisticsUtil {
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
} finally {
ConnectContext.remove();
}
}
// TODO: finish this.
public static List<AnalysisJobInfo> deserializeToAnalysisJob(List<ResultRow> resultBatches) throws TException {
public static List<AnalysisTaskInfo> deserializeToAnalysisJob(List<ResultRow> resultBatches) throws TException {
return new ArrayList<>();
}

View File

@ -41,7 +41,7 @@ public class SqlModeTest {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql());
Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql());
parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_DEFAULT));
try {
@ -49,7 +49,7 @@ public class SqlModeTest {
} catch (Exception e) {
Assert.fail(e.getMessage());
}
Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql());
Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql());
}
@Test

View File

@ -28,7 +28,6 @@ import org.apache.doris.catalog.ColocateTableIndex;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
@ -96,7 +95,6 @@ public class TabletRepairAndBalanceTest {
static {
try {
InternalSchemaInitializer.forTest = true;
tag1 = Tag.create(Tag.TYPE_LOCATION, "zone1");
tag2 = Tag.create(Tag.TYPE_LOCATION, "zone2");
} catch (AnalysisException e) {
@ -106,6 +104,7 @@ public class TabletRepairAndBalanceTest {
@BeforeClass
public static void beforeClass() throws Exception {
FeConstants.runningUnitTest = true;
System.out.println(runningDir);
FeConstants.runningUnitTest = true;
FeConstants.tablet_checker_interval_ms = 1000;

View File

@ -21,7 +21,6 @@ import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.TabletInvertedIndex;
import org.apache.doris.common.Config;
@ -72,7 +71,7 @@ public class TabletReplicaTooSlowTest {
@BeforeClass
public static void beforeClass() throws Exception {
InternalSchemaInitializer.forTest = true;
FeConstants.runningUnitTest = true;
System.out.println(runningDir);
FeConstants.runningUnitTest = true;
FeConstants.tablet_checker_interval_ms = 1000;

View File

@ -27,6 +27,7 @@ import org.apache.doris.utframe.TestWithFeService;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
@ -38,6 +39,16 @@ public class DecommissionBackendTest extends TestWithFeService {
return 3;
}
@Override
protected void beforeCluster() {
FeConstants.runningUnitTest = true;
}
@BeforeAll
public void beforeClass() {
FeConstants.runningUnitTest = true;
}
@Override
protected void beforeCreatingConnectContext() throws Exception {
FeConstants.default_scheduler_interval_millisecond = 1000;

View File

@ -17,9 +17,18 @@
package org.apache.doris.nereids.datasets.ssb;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase;
import org.junit.jupiter.api.BeforeAll;
public abstract class SSBTestBase extends AnalyzeCheckTestBase {
@BeforeAll
public void beforeClass() {
FeConstants.runningUnitTest = true;
}
@Override
protected void runBeforeAll() throws Exception {
createDatabase("test");

View File

@ -1,103 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
import org.apache.doris.statistics.util.BlockingCounter;
import org.apache.doris.utframe.TestWithFeService;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.BlockingQueue;
public class AnalysisJobExecutorTest extends TestWithFeService {
@Mocked
AnalysisJobScheduler analysisJobScheduler;
@Override
protected void runBeforeAll() throws Exception {
try {
InternalSchemaInitializer.createDB();
createDatabase("analysis_job_test");
connectContext.setDatabase("default_cluster:analysis_job_test");
createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n"
+ "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
+ "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n"
+ ");");
InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer();
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void testExpiredJobCancellation() throws Exception {
AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler);
BlockingQueue<AnalysisJobWrapper> b = Deencapsulation.getField(analysisJobExecutor, "jobQueue");
AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
"internal",
"default_cluster:analysis_job_test",
"t1",
"col1", JobType.MANUAL,
ScheduleType.ONCE);
AnalysisJob analysisJob = new AnalysisJob(analysisJobScheduler, analysisJobInfo);
AnalysisJobWrapper analysisJobWrapper = new AnalysisJobWrapper(analysisJobExecutor, analysisJob);
Deencapsulation.setField(analysisJobWrapper, "startTime", 5);
b.put(analysisJobWrapper);
new Expectations() {
{
analysisJobWrapper.cancel();
times = 1;
}
};
analysisJobExecutor.start();
BlockingCounter counter = Deencapsulation.getField(analysisJobExecutor, "blockingCounter");
Assertions.assertEquals(0, counter.getVal());
}
@Test
public void testJobExecution() throws Exception {
AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler);
AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
"internal",
"default_cluster:analysis_job_test",
"t1",
"col1", JobType.MANUAL,
ScheduleType.ONCE);
AnalysisJob job = new AnalysisJob(analysisJobScheduler, analysisJobInfo);
new Expectations() {
{
analysisJobScheduler.getPendingJobs();
result = job;
job.execute();
times = 1;
}
};
Deencapsulation.invoke(analysisJobExecutor, "doFetchAndExecute");
}
}

View File

@ -22,8 +22,9 @@ import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisJobInfo.JobType;
import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.utframe.TestWithFeService;
@ -54,10 +55,10 @@ public class AnalysisJobTest extends TestWithFeService {
}
@Test
public void testCreateAnalysisJob(@Mocked AnalysisJobScheduler scheduler) throws Exception {
public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) throws Exception {
new Expectations() {
{
scheduler.schedule((AnalysisJobInfo) any);
scheduler.schedule((AnalysisTaskInfo) any);
times = 3;
}
};
@ -86,7 +87,7 @@ public class AnalysisJobTest extends TestWithFeService {
}
@Test
public void testJobExecution(@Mocked AnalysisJobScheduler scheduler, @Mocked StmtExecutor stmtExecutor)
public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, @Mocked StmtExecutor stmtExecutor)
throws Exception {
new MockUp<StatisticsUtil>() {
@ -105,13 +106,12 @@ public class AnalysisJobTest extends TestWithFeService {
times = 2;
}
};
AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0,
"internal",
"default_cluster:analysis_job_test",
"t1",
"col1", JobType.MANUAL,
ScheduleType.ONCE);
new AnalysisJob(scheduler, analysisJobInfo).execute();
AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
AnalysisType.COLUMN)
.build();
new OlapAnalysisTask(scheduler, analysisJobInfo).execute();
}
}

View File

@ -0,0 +1,112 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskInfo.JobType;
import org.apache.doris.statistics.util.BlockingCounter;
import org.apache.doris.utframe.TestWithFeService;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.BlockingQueue;
public class AnalysisTaskExecutorTest extends TestWithFeService {
@Mocked
AnalysisTaskScheduler analysisTaskScheduler;
@Override
protected void runBeforeAll() throws Exception {
try {
InternalSchemaInitializer.createDB();
createDatabase("analysis_job_test");
connectContext.setDatabase("default_cluster:analysis_job_test");
createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n"
+ "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n"
+ "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n"
+ ");");
InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer();
Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
public void testExpiredJobCancellation() throws Exception {
AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
BlockingQueue<AnalysisTaskWrapper> b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue");
AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
AnalysisType.COLUMN)
.build();
OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisTaskScheduler, analysisJobInfo);
AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob);
Deencapsulation.setField(analysisTaskWrapper, "startTime", 5);
b.put(analysisTaskWrapper);
new Expectations() {
{
analysisTaskWrapper.cancel();
times = 1;
}
};
analysisTaskExecutor.start();
BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter");
Assertions.assertEquals(0, counter.getVal());
}
@Test
public void testTaskExecution() throws Exception {
AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler);
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0)
.setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1")
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
AnalysisType.COLUMN)
.build();
OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskScheduler, analysisTaskInfo);
new MockUp<AnalysisTaskScheduler>() {
@Mock
public synchronized BaseAnalysisTask getPendingTasks() {
return task;
}
};
new MockUp<AnalysisManager>() {
@Mock
public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {}
};
new Expectations() {
{
task.execute();
times = 1;
}
};
Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute");
}
}

View File

@ -0,0 +1,85 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.jmockit.Deencapsulation;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.utframe.TestWithFeService;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mock;
import mockit.MockUp;
import mockit.Tested;
import org.junit.jupiter.api.Test;
public class MVStatisticsTest extends TestWithFeService {
@Injectable
StatisticsCache statisticsCache;
@Override
protected void runBeforeAll() throws Exception {
createDatabase("test");
connectContext.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ":" + "test");
createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n"
+ "DISTRIBUTED BY HASH(col3)\n"
+ "BUCKETS 1\n"
+ "PROPERTIES(\n"
+ " \"replication_num\"=\"1\"\n"
+ ");\n");
createMv("CREATE MATERIALIZED VIEW mv1 AS SELECT col3 , SUM(COL2) FROM t1 group by col3");
}
@Tested
@Test
public void testCreate() throws Exception {
new Expectations() {
{
statisticsCache.refreshSync(anyLong, anyString);
times = 5;
}
};
new MockUp<StatisticsRepository>() {
};
new MockUp<StatisticsUtil>() {
@Mock
public void execUpdate(String sql) throws Exception {}
};
new MockUp<OlapAnalysisTask>(OlapAnalysisTask.class) {
@Mock
public void execSQL(String sql) throws Exception {}
};
new MockUp<Env>() {
@Mock
public StatisticsCache getStatisticsCache() {
return statisticsCache;
}
};
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
Deencapsulation.setField(analysisManager, "statisticsCache", statisticsCache);
getSqlStmtExecutor("analyze t1");
Thread.sleep(3000);
}
}

View File

@ -41,7 +41,6 @@ import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.InternalSchemaInitializer;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
@ -121,13 +120,16 @@ public abstract class TestWithFeService {
@BeforeAll
public final void beforeAll() throws Exception {
InternalSchemaInitializer.forTest = true;
beforeCreatingConnectContext();
connectContext = createDefaultCtx();
beforeCluster();
createDorisCluster();
runBeforeAll();
}
protected void beforeCluster() {
}
@AfterAll
public final void afterAll() throws Exception {
runAfterAll();