diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 68d209ffe6..0735311f09 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2619,7 +2619,7 @@ show_create_routine_load_stmt ::= // analyze statment analyze_stmt ::= - KW_ANALYZE opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties + KW_ANALYZE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties {: RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties); :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index f99e172c56..f5ea7fabb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -39,14 +38,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -73,198 +70,69 @@ public class AnalyzeStmt extends DdlStmt { private static final Predicate DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 0L; - private final TableName optTableName; + public final boolean wholeTbl; + + private final TableName tableName; + + private TableIf table; + private final PartitionNames optPartitionNames; private List optColumnNames; private Map optProperties; // after analyzed private long dbId; - private final Set tblIds = Sets.newHashSet(); + private final List partitionNames = Lists.newArrayList(); - // TODO(wzt): support multiple tables - public AnalyzeStmt(TableName optTableName, + public AnalyzeStmt(TableName tableName, List optColumnNames, PartitionNames optPartitionNames, Map optProperties) { - this.optTableName = optTableName; + this.tableName = tableName; this.optColumnNames = optColumnNames; this.optPartitionNames = optPartitionNames; + wholeTbl = CollectionUtils.isEmpty(optColumnNames); this.optProperties = optProperties; } - public long getDbId() { - Preconditions.checkArgument(isAnalyzed(), - "The dbId must be obtained after the parsing is complete"); - return dbId; - } - - public Set getTblIds() { - Preconditions.checkArgument(isAnalyzed(), - "The tblIds must be obtained after the parsing is complete"); - return tblIds; - } - - public Database getDb() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The db must be obtained after the parsing is complete"); - return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId); - } - - public List getTables() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The tables must be obtained after the parsing is complete"); - Database db = getDb(); - List
tables = Lists.newArrayList(); - - db.readLock(); - try { - for (Long tblId : tblIds) { - Table table = db.getTableOrAnalysisException(tblId); - tables.add(table); - } - } finally { - db.readUnlock(); - } - - return tables; - } - - public List getPartitionNames() { - Preconditions.checkArgument(isAnalyzed(), - "The partitionNames must be obtained after the parsing is complete"); - return partitionNames; - } - - /** - * The statistics task obtains partitions and then collects partition statistics, - * we need to filter out partitions that do not have data. - * - * @return map of tableId and partitionName - * @throws AnalysisException not analyzed - */ - public Map> getTableIdToPartitionName() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The partitionIds must be obtained after the parsing is complete"); - Map> tableIdToPartitionName = Maps.newHashMap(); - - for (Table table : getTables()) { - table.readLock(); - try { - OlapTable olapTable = (OlapTable) table; - List partitionNames = getPartitionNames(); - List newPartitionNames = new ArrayList<>(partitionNames); - if (newPartitionNames.isEmpty() && olapTable.isPartitioned()) { - newPartitionNames.addAll(olapTable.getPartitionNames()); - } - tableIdToPartitionName.put(table.getId(), newPartitionNames); - } finally { - table.readUnlock(); - } - } - return tableIdToPartitionName; - } - - public Map> getTableIdToColumnName() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The db name must be obtained after the parsing is complete"); - Map> tableIdToColumnName = Maps.newHashMap(); - List
tables = getTables(); - if (optColumnNames == null || optColumnNames.isEmpty()) { - for (Table table : tables) { - table.readLock(); - try { - long tblId = table.getId(); - List baseSchema = table.getBaseSchema(); - List colNames = Lists.newArrayList(); - baseSchema.stream().map(Column::getName).forEach(colNames::add); - tableIdToColumnName.put(tblId, colNames); - } finally { - table.readUnlock(); - } - } - } else { - for (Long tblId : tblIds) { - tableIdToColumnName.put(tblId, optColumnNames); - } - } - - return tableIdToColumnName; - } - - public Map getProperties() { - return optProperties; - } - @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - // step1: analyze db, table and column - if (optTableName != null) { - optTableName.analyze(analyzer); + tableName.analyze(analyzer); - String catalogName = optTableName.getCtl(); - String dbName = optTableName.getDb(); - String tblName = optTableName.getTbl(); - CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName); - DatabaseIf db = catalog.getDbOrAnalysisException(dbName); - TableIf table = db.getTableOrAnalysisException(tblName); + String catalogName = tableName.getCtl(); + String dbName = tableName.getDb(); + String tblName = tableName.getTbl(); + CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName); + DatabaseIf db = catalog.getDbOrAnalysisException(dbName); + table = db.getTableOrAnalysisException(tblName); - checkAnalyzePriv(dbName, tblName); + checkAnalyzePriv(dbName, tblName); - if (optColumnNames != null && !optColumnNames.isEmpty()) { - table.readLock(); - try { - List baseSchema = table.getBaseSchema(false) - .stream().map(Column::getName).collect(Collectors.toList()); - Optional optional = optColumnNames.stream() - .filter(entity -> !baseSchema.contains(entity)).findFirst(); - if (optional.isPresent()) { - String columnName = optional.get(); - ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, - columnName, FeNameFormat.getColumnNameRegex()); - } - } finally { - table.readUnlock(); - } - } else { - optColumnNames = table.getBaseSchema(false) - .stream().map(Column::getName).collect(Collectors.toList()); - } - - dbId = db.getId(); - tblIds.add(table.getId()); - } else { - // analyze the current default db - String dbName = analyzer.getDefaultDb(); - if (Strings.isNullOrEmpty(dbName)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); - } - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName); - - db.readLock(); + if (optColumnNames != null && !optColumnNames.isEmpty()) { + table.readLock(); try { - List
tables = db.getTables(); - for (Table table : tables) { - checkAnalyzeType(table); - checkAnalyzePriv(dbName, table.getName()); - } - - dbId = db.getId(); - for (Table table : tables) { - long tblId = table.getId(); - tblIds.add(tblId); + List baseSchema = table.getBaseSchema(false) + .stream().map(Column::getName).collect(Collectors.toList()); + Optional optional = optColumnNames.stream() + .filter(entity -> !baseSchema.contains(entity)).findFirst(); + if (optional.isPresent()) { + String columnName = optional.get(); + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, + columnName, FeNameFormat.getColumnNameRegex()); } } finally { - db.readUnlock(); + table.readUnlock(); } + } else { + optColumnNames = table.getBaseSchema(false) + .stream().map(Column::getName).collect(Collectors.toList()); } - + dbId = db.getId(); // step2: analyze partition checkPartitionNames(); - // step3: analyze properties checkProperties(); } @@ -286,18 +154,12 @@ public class AnalyzeStmt extends DdlStmt { } } - private void checkAnalyzeType(Table table) throws AnalysisException { - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only OLAP table statistics are supported"); - } - } - private void checkPartitionNames() throws AnalysisException { if (optPartitionNames != null) { optPartitionNames.analyze(analyzer); - if (optTableName != null) { - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(optTableName.getDb()); - OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(optTableName.getTbl()); + if (tableName != null) { + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tableName.getDb()); + OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(tableName.getTbl()); if (!olapTable.isPartitioned()) { throw new AnalysisException("Not a partitioned table: " + olapTable.getName()); } @@ -340,9 +202,9 @@ public class AnalyzeStmt extends DdlStmt { StringBuilder sb = new StringBuilder(); sb.append("ANALYZE"); - if (optTableName != null) { + if (tableName != null) { sb.append(" "); - sb.append(optTableName.toSql()); + sb.append(tableName.toSql()); } if (optColumnNames != null) { @@ -369,18 +231,46 @@ public class AnalyzeStmt extends DdlStmt { } public String getCatalogName() { - return optTableName.getCtl(); + return tableName.getCtl(); } public String getDBName() { - return optTableName.getDb(); + return tableName.getDb(); } - public String getTblName() { - return optTableName.getTbl(); + public TableName getTblName() { + return tableName; } public List getOptColumnNames() { return optColumnNames; } + + + public long getDbId() { + Preconditions.checkArgument(isAnalyzed(), + "The dbId must be obtained after the parsing is complete"); + return dbId; + } + + public Database getDb() throws AnalysisException { + Preconditions.checkArgument(isAnalyzed(), + "The db must be obtained after the parsing is complete"); + return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId); + } + + public TableIf getTable() { + return table; + } + + public List getPartitionNames() { + Preconditions.checkArgument(isAnalyzed(), + "The partitionNames must be obtained after the parsing is complete"); + return partitionNames; + } + + public Map getProperties() { + return optProperties; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java index 643da0095a..b757619867 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java @@ -132,4 +132,7 @@ public class SelectListItem { return expr.toColumnLabel(); } + public void setAlias(String alias) { + this.alias = alias; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 3f754595f1..b03ec0a691 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1833,18 +1833,26 @@ public class SelectStmt extends QueryStmt { if (selectList.isDistinct()) { strBuilder.append("DISTINCT "); } - for (int i = 0; i < resultExprs.size(); ++i) { - // strBuilder.append(selectList.getItems().get(i).toSql()); - // strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); - if (i != 0) { - strBuilder.append(", "); + ConnectContext ctx = ConnectContext.get(); + if (ctx == null || ctx.getSessionVariable().internalSession) { + for (int i = 0; i < selectList.getItems().size(); i++) { + strBuilder.append(selectList.getItems().get(i).toSql()); + strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); } - if (needToSql) { - strBuilder.append(originalExpr.get(i).toSql()); - } else { - strBuilder.append(resultExprs.get(i).toSql()); + } else { + for (int i = 0; i < resultExprs.size(); ++i) { + // strBuilder.append(selectList.getItems().get(i).toSql()); + // strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); + if (i != 0) { + strBuilder.append(", "); + } + if (needToSql) { + strBuilder.append(originalExpr.get(i).toSql()); + } else { + strBuilder.append(resultExprs.get(i).toSql()); + } + strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i))); } - strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i))); } // From clause diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 0aa5d59291..f5c37e5fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.StringJoiner; /** * Superclass of all table references, including references to views, base tables @@ -226,6 +227,13 @@ public class TableRef implements ParseNode, Writable { output.append("[").append(Joiner.on(", ").join(joinHints)).append("] "); } output.append(tableRefToSql()).append(" "); + if (partitionNames != null) { + StringJoiner sj = new StringJoiner(",", "", " "); + for (String partName : partitionNames.getPartitionNames()) { + sj.add(partName); + } + output.append(sj.toString()); + } if (usingColNames != null) { output.append("USING (").append(Joiner.on(", ").join(usingColNames)).append(")"); } else if (onClause != null) { @@ -864,4 +872,12 @@ public class TableRef implements ParseNode, Writable { aliases = new String[]{alias}; } } + + public void setPartitionNames(PartitionNames partitionNames) { + this.partitionNames = partitionNames; + } + + public void setName(TableName name) { + this.name = name; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f57f262526..e29713731f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -208,12 +208,12 @@ import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.service.FrontendOptions; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.AnalysisTaskScheduler; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsJobManager; import org.apache.doris.statistics.StatisticsJobScheduler; import org.apache.doris.statistics.StatisticsManager; -import org.apache.doris.statistics.StatisticsRepository; import org.apache.doris.statistics.StatisticsTaskScheduler; import org.apache.doris.system.Backend; import org.apache.doris.system.FQDNManager; @@ -441,9 +441,7 @@ public class Env { private MTMVJobManager mtmvJobManager; - private final AnalysisJobScheduler analysisJobScheduler; - - private final StatisticsCache statisticsCache; + private AnalysisManager analysisManager; private ExternalMetaCacheMgr extMetaCacheMgr; @@ -647,10 +645,9 @@ public class Env { this.refreshManager = new RefreshManager(); this.policyMgr = new PolicyMgr(); this.mtmvJobManager = new MTMVJobManager(); - this.analysisJobScheduler = new AnalysisJobScheduler(); - this.statisticsCache = new StatisticsCache(); this.extMetaCacheMgr = new ExternalMetaCacheMgr(); this.fqdnManager = new FQDNManager(systemInfo); + this.analysisManager = new AnalysisManager(); } public static void destroyCheckpoint() { @@ -1653,7 +1650,7 @@ public class Env { } public StatisticsCache getStatisticsCache() { - return statisticsCache; + return analysisManager.getStatisticsCache(); } public boolean hasReplayer() { @@ -5233,8 +5230,8 @@ public class Env { return count; } - public AnalysisJobScheduler getAnalysisJobScheduler() { - return analysisJobScheduler; + public AnalysisTaskScheduler getAnalysisJobScheduler() { + return analysisManager.taskScheduler; } // TODO: @@ -5242,6 +5239,11 @@ public class Env { // 2. support sample job // 3. support period job public void createAnalysisJob(AnalyzeStmt analyzeStmt) { - StatisticsRepository.createAnalysisJob(analyzeStmt); + analysisManager.createAnalysisJob(analyzeStmt); } + + public AnalysisManager getAnalysisManager() { + return analysisManager; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 77294b3b76..8cafd7d777 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -44,24 +44,22 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); - public static boolean forTest = false; - /** * If internal table creation failed, will retry after below seconds. */ public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1; - public void run() { - if (forTest) { + if (FeConstants.runningUnitTest) { return; } - while (true) { + while (!created()) { FrontendNodeType feType = Env.getCurrentEnv().getFeType(); if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) { LOG.warn("FE is not ready"); @@ -72,7 +70,6 @@ public class InternalSchemaInitializer extends Thread { .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); createDB(); createTbl(); - break; } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); } @@ -145,10 +142,12 @@ public class InternalSchemaInitializer extends Thread { FeConstants.INTERNAL_DB_NAME, StatisticConstants.ANALYSIS_JOB_TABLE); List columnDefs = new ArrayList<>(); columnDefs.add(new ColumnDef("job_id", TypeDef.create(PrimitiveType.BIGINT))); + columnDefs.add(new ColumnDef("task_id", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("catalog_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("db_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024))); + columnDefs.add(new ColumnDef("index_id", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32))); columnDefs.add(new ColumnDef("analysis_type", TypeDef.createVarchar(32))); columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024))); @@ -175,4 +174,16 @@ public class InternalSchemaInitializer extends Thread { return createTableStmt; } + private boolean created() { + Optional optionalDatabase = + Env.getCurrentEnv().getInternalCatalog() + .getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME); + if (!optionalDatabase.isPresent()) { + return false; + } + Database db = optionalDatabase.get(); + return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent() + && db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 91cd500ddf..c43aef26eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,9 +47,12 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.MVAnalysisTask; +import org.apache.doris.statistics.OlapAnalysisTask; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TCompressionType; @@ -997,8 +1000,11 @@ public class OlapTable extends Table { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { - return new AnalysisJob(scheduler, info); + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { + if (info.analysisType.equals(AnalysisType.COLUMN)) { + return new OlapAnalysisTask(scheduler, info); + } + return new MVAnalysisTask(scheduler, info); } @Override @@ -1939,4 +1945,5 @@ public class OlapTable extends Table { public Set getPartitionKeys() { return idToPartition.keySet(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 24c33bd837..56f02f9832 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -26,9 +26,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.external.hudi.HudiTable; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.base.Preconditions; @@ -508,7 +508,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { throw new NotImplementedException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 361ef44ba2..6aebe1feb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -20,9 +20,9 @@ package org.apache.doris.catalog; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import java.util.Collections; @@ -111,7 +111,7 @@ public interface TableIf { TTableDescriptor toThrift(); - AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info); + BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info); /** * Doris table type. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index eeca74ff5c..c4cb6aad00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -29,9 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import com.google.gson.annotations.SerializedName; @@ -301,7 +301,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { throw new NotImplementedException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index bfefe1f829..d3e991146f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -22,11 +22,11 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.PooledHiveMetaStoreClient; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; -import org.apache.doris.statistics.HiveAnalysisJob; -import org.apache.doris.statistics.IcebergAnalysisJob; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.HiveAnalysisTask; +import org.apache.doris.statistics.IcebergAnalysisTask; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -268,13 +268,13 @@ public class HMSExternalTable extends ExternalTable { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { makeSureInitialized(); switch (dlaType) { case HIVE: - return new HiveAnalysisJob(scheduler, info); + return new HiveAnalysisTask(scheduler, info); case ICEBERG: - return new IcebergAnalysisJob(scheduler, info); + return new IcebergAnalysisTask(scheduler, info); default: throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7a83d8db8d..38503d1263 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1791,12 +1791,13 @@ public class StmtExecutor implements ProfileWriter { public List executeInternalQuery() { try { + List resultRows = new ArrayList<>(); analyzer = new Analyzer(context.getEnv(), context); try { analyze(context.getSessionVariable().toThrift()); } catch (UserException e) { LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); - return null; + return resultRows; } planner.getFragments(); RowBatch batch; @@ -1821,7 +1822,6 @@ public class StmtExecutor implements ProfileWriter { } Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result") .setParent(Context.current()).startSpan(); - List resultRows = new ArrayList<>(); try (Scope scope = fetchResultSpan.makeCurrent()) { while (true) { batch = coord.getNext(); @@ -1834,7 +1834,7 @@ public class StmtExecutor implements ProfileWriter { } catch (Exception e) { LOG.warn("Unexpected exception when SQL running", e); fetchResultSpan.recordException(e); - return null; + return resultRows; } finally { fetchResultSpan.end(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java deleted file mode 100644 index 06fb36e644..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ /dev/null @@ -1,207 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.commons.text.StringSubstitutor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class AnalysisJob { - - private final AnalysisJobScheduler analysisJobScheduler; - - protected final AnalysisJobInfo info; - - protected CatalogIf catalog; - - protected DatabaseIf db; - - protected TableIf tbl; - - protected Column col; - - protected StmtExecutor stmtExecutor; - - public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - this.analysisJobScheduler = analysisJobScheduler; - this.info = info; - init(info); - } - - private void init(AnalysisJobInfo info) { - catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName); - if (catalog == null) { - analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED, - String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis()); - return; - } - db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null); - if (db == null) { - analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED, - String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis()); - return; - } - tbl = (TableIf) db.getTable(info.tblName).orElse(null); - if (tbl == null) { - analysisJobScheduler.updateJobStatus( - info.jobId, JobState.FAILED, - String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis()); - } - col = tbl.getColumn(info.colName); - if (col == null) { - analysisJobScheduler.updateJobStatus( - info.jobId, JobState.FAILED, String.format("Column with name %s not exists", info.tblName), - System.currentTimeMillis()); - } - } - - private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " - + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(${colName}) AS ndv, " - + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(${colName}) AS min, " - + "MAX(${colName}) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW()" - + "FROM `${dbName}`.`${tblName}` " - + "PARTITION ${partName}"; - - private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, " - + " ndv, null_count, min, max, data_size, update_time\n" - + " FROM \n" - + " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, " - + " ${catalogId} AS catalog_id, " - + " ${dbId} AS db_id, " - + " ${tblId} AS tbl_id, " - + " '${colId}' AS col_id, " - + " NULL AS part_id, " - + " SUM(count) AS row_count, \n" - + " SUM(null_count) AS null_count, " - + " MIN(CAST(min AS ${type})) AS min, " - + " MAX(CAST(max AS ${type})) AS max, " - + " SUM(data_size_in_bytes) AS data_size, " - + " NOW() AS update_time\n" - + " FROM ${internalDB}.${columnStatTbl}" - + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " - + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " - + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " - + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" - + " ) t1, \n" - + " (SELECT NDV(${colName}) AS ndv FROM `${dbName}`.`${tblName}`) t2\n"; - - private String getDataSizeFunction() { - if (col.getType().isStringType()) { - return "SUM(LENGTH(${colName}))"; - } - return "COUNT(1) * " + col.getType().getSlotSize(); - } - - public void execute() throws Exception { - Map params = new HashMap<>(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction()); - params.put("dbName", info.dbName); - params.put("colName", String.valueOf(info.colName)); - params.put("tblName", String.valueOf(info.tblName)); - List partitionAnalysisSQLs = new ArrayList<>(); - try { - tbl.readLock(); - Set partNames = ((Table) tbl).getPartitionNames(); - for (String partName : partNames) { - Partition part = ((Table) tbl).getPartition(partName); - if (part == null) { - continue; - } - params.put("partId", String.valueOf(((Table) tbl).getPartition(partName).getId())); - params.put("partName", String.valueOf(partName)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); - } - } finally { - tbl.readUnlock(); - } - for (String sql : partitionAnalysisSQLs) { - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - } - } - params.remove("partId"); - params.put("type", col.getType().toString()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); - } - } - - public int getLastExecTime() { - return info.lastExecTimeInMs; - } - - public void cancel() { - if (stmtExecutor != null) { - stmtExecutor.cancel(); - } - analysisJobScheduler - .updateJobStatus(info.jobId, JobState.FAILED, - String.format("Job has been cancelled: %s", info.toString()), -1); - } - - public void updateState(JobState jobState) { - info.updateState(jobState); - } - - public long getJobId() { - return info.jobId; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java new file mode 100644 index 0000000000..3833acc20f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class AnalysisManager { + + private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); + + private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " + + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " + + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}"; + + private final ConcurrentMap> analysisJobIdToTaskMap; + + public final AnalysisTaskScheduler taskScheduler; + + private StatisticsCache statisticsCache; + + private final AnalysisTaskExecutor taskExecutor; + + public AnalysisManager() { + analysisJobIdToTaskMap = new ConcurrentHashMap<>(); + this.taskScheduler = new AnalysisTaskScheduler(); + taskExecutor = new AnalysisTaskExecutor(taskScheduler); + this.statisticsCache = new StatisticsCache(); + taskExecutor.start(); + } + + public void createAnalysisJob(AnalyzeStmt analyzeStmt) { + String catalogName = analyzeStmt.getCatalogName(); + String db = analyzeStmt.getDBName(); + TableName tbl = analyzeStmt.getTblName(); + StatisticsUtil.convertTableNameToObjects(tbl); + List colNames = analyzeStmt.getOptColumnNames(); + Map analysisTaskInfos = new HashMap<>(); + long jobId = Env.getCurrentEnv().getNextId(); + if (colNames != null) { + for (String colName : colNames) { + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId) + .setTaskId(taskId).setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setColName(colName).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.COLUMN) + .setScheduleType(ScheduleType.ONCE).build(); + try { + StatisticsRepository.createAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new RuntimeException("Failed to create analysis job", e); + } + analysisTaskInfos.put(taskId, analysisTaskInfo); + } + } + if (analyzeStmt.wholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP)) { + OlapTable olapTable = (OlapTable) analyzeStmt.getTable(); + try { + olapTable.readLock(); + for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) { + if (meta.getDefineStmt() == null) { + continue; + } + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( + jobId).setTaskId(taskId) + .setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) + .setScheduleType(ScheduleType.ONCE).build(); + try { + StatisticsRepository.createAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new RuntimeException("Failed to create analysis job", e); + } + analysisTaskInfos.put(taskId, analysisTaskInfo); + } + } finally { + olapTable.readUnlock(); + } + } + analysisJobIdToTaskMap.put(jobId, analysisTaskInfos); + analysisTaskInfos.values().forEach(taskScheduler::schedule); + } + + public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) { + Map params = new HashMap<>(); + params.put("jobState", jobState.toString()); + params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : ""); + params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); + params.put("jobId", String.valueOf(info.jobId)); + try { + StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); + } catch (Exception e) { + LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e); + } finally { + info.state = jobState; + if (analysisJobIdToTaskMap.get(info.jobId).values() + .stream().allMatch(i -> i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) { + analysisJobIdToTaskMap.remove(info.jobId); + } + + } + } + + public StatisticsCache getStatisticsCache() { + return statisticsCache; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java new file mode 100644 index 0000000000..bab8a462e8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public enum AnalysisState { + PENDING, + RUNNING, + FINISHED, + FAILED; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java similarity index 75% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index c73bf349b8..ff98890cf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -17,10 +17,10 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; import org.apache.doris.statistics.util.BlockingCounter; import org.apache.logging.log4j.LogManager; @@ -33,9 +33,9 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class AnalysisJobExecutor extends Thread { +public class AnalysisTaskExecutor extends Thread { - private static final Logger LOG = LogManager.getLogger(AnalysisJobExecutor.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool( Config.statistics_simultaneously_running_job_num, @@ -44,17 +44,17 @@ public class AnalysisJobExecutor extends Thread { new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), "Analysis Job Executor", true); - private final AnalysisJobScheduler jobScheduler; + private final AnalysisTaskScheduler taskScheduler; private final BlockingCounter blockingCounter = new BlockingCounter(Config.statistics_simultaneously_running_job_num); - private final BlockingQueue jobQueue = - new PriorityBlockingQueue(20, - Comparator.comparingLong(AnalysisJobWrapper::getStartTime)); + private final BlockingQueue jobQueue = + new PriorityBlockingQueue(20, + Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); - public AnalysisJobExecutor(AnalysisJobScheduler jobExecutor) { - this.jobScheduler = jobExecutor; + public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) { + this.taskScheduler = jobExecutor; } @Override @@ -73,12 +73,12 @@ public class AnalysisJobExecutor extends Thread { private void doCancelExpiredJob() { for (;;) { try { - AnalysisJobWrapper jobWrapper = jobQueue.take(); + AnalysisTaskWrapper taskWrapper = jobQueue.take(); try { long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS; - jobWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); + taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { - jobWrapper.cancel(); + taskWrapper.cancel(); } } catch (Throwable throwable) { LOG.warn(throwable); @@ -101,11 +101,13 @@ public class AnalysisJobExecutor extends Thread { } private void doFetchAndExecute() { - AnalysisJob job = jobScheduler.getPendingJobs(); - AnalysisJobWrapper jobWrapper = new AnalysisJobWrapper(this, job); + BaseAnalysisTask task = taskScheduler.getPendingTasks(); + AnalysisTaskWrapper jobWrapper = new AnalysisTaskWrapper(this, task); incr(); - jobScheduler.updateJobStatus(job.getJobId(), JobState.RUNNING, "", -1); executors.submit(jobWrapper); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.RUNNING, "", System.currentTimeMillis()); } public void decr() { @@ -116,7 +118,7 @@ public class AnalysisJobExecutor extends Thread { blockingCounter.incr(); } - public void putJob(AnalysisJobWrapper wrapper) throws Exception { + public void putJob(AnalysisTaskWrapper wrapper) throws Exception { jobQueue.put(wrapper); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java similarity index 65% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index da8fa4cede..6c2243b261 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -20,23 +20,21 @@ package org.apache.doris.statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Objects; import java.util.StringJoiner; -public class AnalysisJobInfo { +public class AnalysisTaskInfo { - private static final Logger LOG = LogManager.getLogger(AnalysisJobInfo.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class); - public enum JobState { - PENDING, - RUNNING, - FINISHED, - FAILED; + + public enum AnalysisMethod { + SAMPLE, + FULL } public enum AnalysisType { - SAMPLE, - FULL; + COLUMN, + INDEX } public enum JobType { @@ -53,6 +51,8 @@ public class AnalysisJobInfo { public final long jobId; + public final long taskId; + public final String catalogName; public final String dbName; @@ -61,56 +61,43 @@ public class AnalysisJobInfo { public final String colName; + public final Long indexId; + public final JobType jobType; - public AnalysisType analysisType; + public final AnalysisMethod analysisMethod; + + public final AnalysisType analysisType; public String message; // finished or failed public int lastExecTimeInMs = 0; - private JobState state; + public AnalysisState state; public final ScheduleType scheduleType; - public AnalysisJobInfo(long jobId, String catalogName, String dbName, String tblName, String colName, - JobType jobType, ScheduleType scheduleType) { + public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, + String colName, Long indexId, JobType jobType, + AnalysisMethod analysisMethod, AnalysisType analysisType, String message, int lastExecTimeInMs, + AnalysisState state, ScheduleType scheduleType) { this.jobId = jobId; + this.taskId = taskId; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; this.colName = colName; + this.indexId = indexId; this.jobType = jobType; + this.analysisMethod = analysisMethod; + this.analysisType = analysisType; + this.message = message; + this.lastExecTimeInMs = lastExecTimeInMs; + this.state = state; this.scheduleType = scheduleType; } - @Override - public int hashCode() { - return Objects.hash(catalogName, dbName, tblName, colName, analysisType); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - AnalysisJobInfo other = (AnalysisJobInfo) obj; - return catalogName.equals(other.catalogName) - && dbName.equals(other.dbName) - && tblName.equals(other.tblName) - && colName.equals(other.colName) - && analysisType.equals(other.analysisType); - } - - // TODO: log to meta - public void updateState(JobState jobState) { - this.state = jobState; - } - @Override public String toString() { StringJoiner sj = new StringJoiner("\n", getClass().getName() + ":\n", "\n"); @@ -119,14 +106,15 @@ public class AnalysisJobInfo { sj.add("DBName: " + dbName); sj.add("TableName: " + tblName); sj.add("ColumnName: " + colName); - sj.add("JobType: " + analysisType.toString()); + sj.add("TaskType: " + analysisType.toString()); + sj.add("TaskMethod: " + analysisMethod.toString()); sj.add("Message: " + message); sj.add("LastExecTime: " + String.valueOf(lastExecTimeInMs)); sj.add("CurrentState: " + state.toString()); return sj.toString(); } - public JobState getState() { + public AnalysisState getState() { return state; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java new file mode 100644 index 0000000000..cc3b7b62f1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; + +public class AnalysisTaskInfoBuilder { + private long jobId; + private long taskId; + private String catalogName; + private String dbName; + private String tblName; + private String colName; + private Long indexId; + private JobType jobType; + private AnalysisMethod analysisMethod; + private AnalysisType analysisType; + private String message; + private int lastExecTimeInMs; + private AnalysisState state; + private ScheduleType scheduleType; + + public AnalysisTaskInfoBuilder setJobId(long jobId) { + this.jobId = jobId; + return this; + } + + public AnalysisTaskInfoBuilder setTaskId(long taskId) { + this.taskId = taskId; + return this; + } + + public AnalysisTaskInfoBuilder setCatalogName(String catalogName) { + this.catalogName = catalogName; + return this; + } + + public AnalysisTaskInfoBuilder setDbName(String dbName) { + this.dbName = dbName; + return this; + } + + public AnalysisTaskInfoBuilder setTblName(String tblName) { + this.tblName = tblName; + return this; + } + + public AnalysisTaskInfoBuilder setColName(String colName) { + this.colName = colName; + return this; + } + + public AnalysisTaskInfoBuilder setIndexId(Long indexId) { + this.indexId = indexId; + return this; + } + + public AnalysisTaskInfoBuilder setJobType(JobType jobType) { + this.jobType = jobType; + return this; + } + + public AnalysisTaskInfoBuilder setAnalysisMethod(AnalysisMethod analysisMethod) { + this.analysisMethod = analysisMethod; + return this; + } + + public AnalysisTaskInfoBuilder setAnalysisType(AnalysisType analysisType) { + this.analysisType = analysisType; + return this; + } + + public AnalysisTaskInfoBuilder setMessage(String message) { + this.message = message; + return this; + } + + public AnalysisTaskInfoBuilder setLastExecTimeInMs(int lastExecTimeInMs) { + this.lastExecTimeInMs = lastExecTimeInMs; + return this; + } + + public AnalysisTaskInfoBuilder setState(AnalysisState state) { + this.state = state; + return this; + } + + public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) { + this.scheduleType = scheduleType; + return this; + } + + public AnalysisTaskInfo build() { + return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colName, indexId, jobType, + analysisMethod, analysisType, message, lastExecTimeInMs, state, scheduleType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java similarity index 54% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java index 0918339d35..71f23fe955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java @@ -20,94 +20,63 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.text.StringSubstitutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -public class AnalysisJobScheduler { +public class AnalysisTaskScheduler { - private static final Logger LOG = LogManager.getLogger(AnalysisJobScheduler.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskScheduler.class); - private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " - + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " - + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}"; + private final PriorityQueue systemJobQueue = + new PriorityQueue<>(Comparator.comparingInt(BaseAnalysisTask::getLastExecTime)); - private final PriorityQueue systemJobQueue = - new PriorityQueue(Comparator.comparingInt(AnalysisJob::getLastExecTime)); + private final Queue manualJobQueue = new LinkedList<>(); - private final Queue manualJobQueue = new LinkedList<>(); + private final Set systemJobSet = new HashSet<>(); - private final Set systemJobSet = new HashSet<>(); + private final Set manualJobSet = new HashSet<>(); - private final Set manualJobSet = new HashSet<>(); - - private final AnalysisJobExecutor jobExecutor = new AnalysisJobExecutor(this); - - { - jobExecutor.start(); - } - - public void updateJobStatus(long jobId, JobState jobState, String message, long time) { - Map params = new HashMap<>(); - params.put("jobState", jobState.toString()); - params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : ""); - params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); - params.put("jobId", String.valueOf(jobId)); - try { - StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); - } catch (Exception e) { - LOG.warn(String.format("Failed to update state for job: %s", jobId), e); - } - - } - - public synchronized void scheduleJobs(List analysisJobInfos) { - for (AnalysisJobInfo job : analysisJobInfos) { + public synchronized void scheduleJobs(List analysisJobInfos) { + for (AnalysisTaskInfo job : analysisJobInfos) { schedule(job); } } - public synchronized void schedule(AnalysisJobInfo analysisJobInfo) { + public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName); Preconditions.checkArgument(catalog != null); DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName); Preconditions.checkArgument(db != null); TableIf table = db.getTableNullable(analysisJobInfo.tblName); Preconditions.checkArgument(table != null); - AnalysisJob analysisJob = table.createAnalysisJob(this, analysisJobInfo); - addToManualJobQueue(analysisJob); + BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo); + addToManualJobQueue(analysisTask); if (analysisJobInfo.jobType.equals(JobType.MANUAL)) { return; } - addToSystemQueue(analysisJob); + addToSystemQueue(analysisTask); } - private void removeFromSystemQueue(AnalysisJob analysisJobInfo) { + private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) { if (manualJobSet.contains(analysisJobInfo)) { systemJobQueue.remove(analysisJobInfo); manualJobSet.remove(analysisJobInfo); } } - private void addToSystemQueue(AnalysisJob analysisJobInfo) { + private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) { if (systemJobSet.contains(analysisJobInfo)) { return; } @@ -116,7 +85,7 @@ public class AnalysisJobScheduler { notify(); } - private void addToManualJobQueue(AnalysisJob analysisJobInfo) { + private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) { if (manualJobSet.contains(analysisJobInfo)) { return; } @@ -125,7 +94,7 @@ public class AnalysisJobScheduler { notify(); } - public synchronized AnalysisJob getPendingJobs() { + public synchronized BaseAnalysisTask getPendingTasks() { while (true) { if (!manualJobQueue.isEmpty()) { return manualJobQueue.poll(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 244a53b187..3ca55dbd9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -18,60 +18,66 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.FutureTask; -public class AnalysisJobWrapper extends FutureTask { +public class AnalysisTaskWrapper extends FutureTask { - private static final Logger LOG = LogManager.getLogger(AnalysisJobWrapper.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskWrapper.class); - private final AnalysisJob job; + private final BaseAnalysisTask task; private long startTime; - private final AnalysisJobExecutor executor; + private final AnalysisTaskExecutor executor; - public AnalysisJobWrapper(AnalysisJobExecutor executor, AnalysisJob job) { + public AnalysisTaskWrapper(AnalysisTaskExecutor executor, BaseAnalysisTask job) { super(() -> { job.execute(); return null; }); this.executor = executor; - this.job = job; + this.task = job; } @Override public void run() { startTime = System.currentTimeMillis(); - Exception except = null; + Throwable except = null; try { executor.putJob(this); super.run(); + Object result = get(); + if (result instanceof Throwable) { + except = (Throwable) result; + } } catch (Exception e) { except = e; } finally { executor.decr(); if (except != null) { - Env.getCurrentEnv().getAnalysisJobScheduler() - .updateJobStatus(job.getJobId(), JobState.FAILED, except.getMessage(), -1); + LOG.warn("Failed to execute task", except); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FAILED, except.getMessage(), -1); } else { - Env.getCurrentEnv().getAnalysisJobScheduler() - .updateJobStatus(job.getJobId(), JobState.FINISHED, "", System.currentTimeMillis()); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FINISHED, "", System.currentTimeMillis()); } - LOG.warn("{} finished, cost time:{}", job.toString(), System.currentTimeMillis() - startTime); + LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); } } public boolean cancel() { try { - LOG.warn("{} cancelled, cost time:{}", job.toString(), System.currentTimeMillis() - startTime); - job.cancel(); + LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); + task.cancel(); } catch (Exception e) { - LOG.warn(String.format("Cancel job failed job info : %s", job.toString())); + LOG.warn(String.format("Cancel job failed job info : %s", task.toString())); } finally { executor.decr(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java new file mode 100644 index 0000000000..79b0ec8c16 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class BaseAnalysisTask { + + protected static final String INSERT_PART_STATISTICS = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "'${colId}' AS col_id, " + + "${partId} AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() "; + + protected static final String INSERT_COL_STATISTICS = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, " + + " ndv, null_count, min, max, data_size, update_time\n" + + " FROM \n" + + " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, " + + " ${catalogId} AS catalog_id, " + + " ${dbId} AS db_id, " + + " ${tblId} AS tbl_id, " + + " '${colId}' AS col_id, " + + " NULL AS part_id, " + + " SUM(count) AS row_count, \n" + + " SUM(null_count) AS null_count, " + + " MIN(CAST(min AS ${type})) AS min, " + + " MAX(CAST(max AS ${type})) AS max, " + + " SUM(data_size_in_bytes) AS data_size, " + + " NOW() AS update_time\n" + + " FROM ${internalDB}.${columnStatTbl}" + + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " + + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " + + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " + + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" + + " ) t1, \n"; + + protected AnalysisTaskScheduler analysisTaskScheduler; + + protected AnalysisTaskInfo info; + + protected CatalogIf catalog; + + protected DatabaseIf db; + + protected TableIf tbl; + + protected Column col; + + protected StmtExecutor stmtExecutor; + + protected AnalysisState analysisState; + + @VisibleForTesting + public BaseAnalysisTask() { + + } + + public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + this.analysisTaskScheduler = analysisTaskScheduler; + this.info = info; + init(info); + } + + private void init(AnalysisTaskInfo info) { + catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName); + if (catalog == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED, + String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis()); + return; + } + db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null); + if (db == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED, + String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis()); + return; + } + tbl = (TableIf) db.getTable(info.tblName).orElse(null); + if (tbl == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( + info, AnalysisState.FAILED, + String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis()); + } + if (info.analysisType != null && info.analysisType.equals(AnalysisType.COLUMN)) { + col = tbl.getColumn(info.colName); + if (col == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( + info, AnalysisState.FAILED, String.format("Column with name %s not exists", info.tblName), + System.currentTimeMillis()); + } + } + + } + + public abstract void execute() throws Exception; + + public void cancel() { + if (stmtExecutor != null) { + stmtExecutor.cancel(); + } + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(info, AnalysisState.FAILED, + String.format("Job has been cancelled: %s", info.toString()), -1); + } + + public int getLastExecTime() { + return info.lastExecTimeInMs; + } + + public long getJobId() { + return info.jobId; + } + + public AnalysisState getAnalysisState() { + return analysisState; + } + + protected String getDataSizeFunction(Column column) { + if (column.getType().isStringType()) { + return "SUM(LENGTH(`${colName}`))"; + } + return "COUNT(1) * " + column.getType().getSlotSize(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index c92d92dac9..9a6dc93564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -22,12 +22,12 @@ import org.apache.doris.common.Config; import org.apache.commons.lang.NotImplementedException; -public class HMSAnalysisJob extends AnalysisJob { +public class HMSAnalysisTask extends BaseAnalysisTask { protected HMSExternalTable table; - public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public HMSAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); table = (HMSExternalTable) tbl; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 216cafcb89..7bba21c692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -43,16 +43,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class HiveAnalysisJob extends HMSAnalysisJob { - private static final Logger LOG = LogManager.getLogger(HiveAnalysisJob.class); +public class HiveAnalysisTask extends HMSAnalysisTask { + private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class); public static final String TOTAL_SIZE = "totalSize"; public static final String NUM_ROWS = "numRows"; public static final String NUM_FILES = "numFiles"; public static final String TIMESTAMP = "transient_lastDdlTime"; - public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public HiveAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); } private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java index a62052f8bf..b51fa4eb53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java @@ -38,14 +38,14 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -public class IcebergAnalysisJob extends HMSAnalysisJob { +public class IcebergAnalysisTask extends HMSAnalysisTask { private long numRows = 0; private long dataSize = 0; private long numNulls = 0; - public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public IcebergAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); } private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java new file mode 100644 index 0000000000..70f4d0c5f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.SelectListItem; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.base.Preconditions; + +import java.io.StringReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which + * columns to be analyzed. + * TODO: Supports multi-table mv + */ +public class MVAnalysisTask extends BaseAnalysisTask { + + private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS + + " FROM (${sql}) mv"; + + private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS + + " (SELECT NDV(`${colName}`) AS ndv " + + " FROM (${sql}) mv) t2\n"; + + private MaterializedIndexMeta meta; + + private SelectStmt selectStmt; + + private OlapTable olapTable; + + public MVAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); + init(); + } + + private void init() { + olapTable = (OlapTable) tbl; + meta = olapTable.getIndexMetaByIndexId(info.indexId); + Preconditions.checkState(meta != null); + String mvDef = meta.getDefineStmt().originStmt; + SqlScanner input = + new SqlScanner(new StringReader(mvDef), 0L); + SqlParser parser = new SqlParser(input); + CreateMaterializedViewStmt cmv = null; + try { + cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + selectStmt = cmv.getSelectStmt(); + selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName()); + } + + @Override + public void execute() throws Exception { + for (Column column : meta.getSchema()) { + SelectStmt selectOne = (SelectStmt) selectStmt.clone(); + TableRef tableRef = selectOne.getTableRefs().get(0); + SelectListItem selectItem = selectOne.getSelectList().getItems() + .stream() + .filter(i -> isCorrespondingToColumn(i, column)) + .findFirst() + .get(); + selectItem.setAlias(column.getName()); + Map params = new HashMap<>(); + for (Partition part : olapTable.getAllPartitions()) { + String partName = part.getName(); + PartitionNames partitionName = new PartitionNames(false, Arrays.asList(partName)); + tableRef.setPartitionNames(partitionName); + String sql = selectOne.toSql(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(meta.getIndexId())); + String colName = column.getName(); + params.put("colId", colName); + long partId = part.getId(); + params.put("partId", String.valueOf(partId)); + params.put("dataSizeFunction", getDataSizeFunction(column)); + params.put("dbName", info.dbName); + params.put("colName", colName); + params.put("tblName", String.valueOf(info.tblName)); + params.put("sql", sql); + StatisticsUtil.execUpdate(ANALYZE_MV_PART, params); + } + params.remove("partId"); + params.put("type", column.getType().toString()); + StatisticsUtil.execUpdate(ANALYZE_MV_COL, params); + Env.getCurrentEnv().getStatisticsCache().refreshSync(meta.getIndexId(), column.getName()); + } + } + + // Based on the fact that materialized view create statement's select expr only contains basic SlotRef and + // AggregateFunction. + private boolean isCorrespondingToColumn(SelectListItem item, Column column) { + Expr expr = item.getExpr(); + if (expr instanceof SlotRef) { + SlotRef slotRef = (SlotRef) expr; + return slotRef.getColumnName().equalsIgnoreCase(column.getName()); + } + if (expr instanceof FunctionCallExpr) { + FunctionCallExpr func = (FunctionCallExpr) expr; + SlotRef slotRef = (SlotRef) func.getChild(0); + return slotRef.getColumnName().equalsIgnoreCase(column.getName()); + } + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java new file mode 100644 index 0000000000..23ea5ea6a6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.text.StringSubstitutor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Each task analyze one column. + */ +public class OlapAnalysisTask extends BaseAnalysisTask { + + private static final String ANALYZE_PARTITION_SQL_TEMPLATE = INSERT_PART_STATISTICS + + "FROM `${dbName}`.`${tblName}` " + + "PARTITION ${partName}"; + + private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS + + " (SELECT NDV(`${colName}`) AS ndv " + + " FROM `${dbName}`.`${tblName}`) t2\n"; + + @VisibleForTesting + public OlapAnalysisTask() { + super(); + } + + public OlapAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); + } + + public void execute() throws Exception { + Map params = new HashMap<>(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(tbl.getId())); + params.put("colId", String.valueOf(info.colName)); + params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dbName", info.dbName); + params.put("colName", String.valueOf(info.colName)); + params.put("tblName", String.valueOf(info.tblName)); + List partitionAnalysisSQLs = new ArrayList<>(); + try { + tbl.readLock(); + Set partNames = tbl.getPartitionNames(); + for (String partName : partNames) { + Partition part = tbl.getPartition(partName); + if (part == null) { + continue; + } + params.put("partId", String.valueOf(tbl.getPartition(partName).getId())); + params.put("partName", String.valueOf(partName)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + } + } finally { + tbl.readUnlock(); + } + execSQLs(partitionAnalysisSQLs); + params.remove("partId"); + params.put("type", col.getType().toString()); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); + execSQL(sql); + Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); + } + + @VisibleForTesting + public void execSQLs(List partitionAnalysisSQLs) throws Exception { + for (String sql : partitionAnalysisSQLs) { + execSQL(sql); + } + } + + @VisibleForTesting + public void execSQL(String sql) throws Exception { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java index c369da43b9..62cc5638ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -260,21 +259,6 @@ public class StatisticsJob { } } - /** - * get statisticsJob from analyzeStmt. - * AnalyzeStmt: analyze t1(c1,c2,c3) - * tableId: [t1] - * tableIdToColumnName: {t1: [c1,c2,c3]} - */ - public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws AnalysisException { - long dbId = stmt.getDbId(); - Set tblIds = stmt.getTblIds(); - Map> tableIdToPartitionName = stmt.getTableIdToPartitionName(); - Map> tableIdToColumnName = stmt.getTableIdToColumnName(); - Map properties = stmt.getProperties(); - return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, tableIdToColumnName, properties); - } - public List getShowInfo(@Nullable Long tableId) throws AnalysisException { List result = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java index 37966dc1fe..dbdc202f0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java @@ -17,20 +17,16 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; -import org.apache.doris.qe.ConnectContext; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -61,37 +57,6 @@ public class StatisticsJobManager { return idToStatisticsJob; } - public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException { - // The current statistics are only used for CBO test, - // and are not available to users. (work in progress) - // TODO(wzt): Further tests are needed - boolean enableCboStatistics = ConnectContext.get() - .getSessionVariable().getEnableCboStatistics(); - if (enableCboStatistics) { - // step1: init statistics job by analyzeStmt - StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt); - synchronized (this) { - // step2: check restrict - checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds()); - // step3: create it - createStatisticsJob(statisticsJob); - } - } else { - throw new UserException("Statistics are not yet stable, if you want to enable statistics," - + " use 'set enable_cbo_statistics=true' to enable it."); - } - } - - public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException { - idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); - try { - Env.getCurrentEnv().getStatisticsJobScheduler().addPendingJob(statisticsJob); - } catch (IllegalStateException e) { - LOG.info("The pending statistics job is full. Please submit it again later."); - throw new DdlException("The pending statistics job is full, Please submit it again later."); - } - } - /** * The statistical job has the following restrict: * - Rule1: The same table cannot have two unfinished statistics jobs diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index d1b9c8f4f4..715ec14464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -18,16 +18,12 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AlterColumnStatsStmt; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; -import org.apache.doris.statistics.AnalysisJobInfo.AnalysisType; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; @@ -71,8 +67,9 @@ public class StatisticsRepository { + " WHERE `id` IN (${idList})"; private static final String PERSIST_ANALYSIS_JOB_SQL_TEMPLATE = "INSERT INTO " - + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, '${catalogName}', '${dbName}'," - + "'${tblName}','${colName}', '${jobType}', '${analysisType}', '${message}', '${lastExecTimeInMs}'," + + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, ${taskId}, '${catalogName}', '${dbName}'," + + "'${tblName}','${colName}', ,'${indexId}','${jobType}', '${analysisType}', " + + "'${message}', '${lastExecTimeInMs}'," + "'${state}', '${scheduleType}')"; private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO " @@ -134,39 +131,23 @@ public class StatisticsRepository { return stringJoiner.toString(); } - public static void createAnalysisJob(AnalyzeStmt analyzeStmt) { - String catalogName = analyzeStmt.getCatalogName(); - String db = analyzeStmt.getDBName(); - String tbl = analyzeStmt.getTblName(); - List colNames = analyzeStmt.getOptColumnNames(); - - if (colNames != null) { - for (String colName : colNames) { - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db, - tbl, colName, AnalysisJobInfo.JobType.MANUAL, ScheduleType.ONCE); - analysisJobInfo.analysisType = AnalysisType.FULL; - Map params = new HashMap<>(); - params.put("jobId", String.valueOf(analysisJobInfo.jobId)); - params.put("catalogName", analysisJobInfo.catalogName); - params.put("dbName", analysisJobInfo.dbName); - params.put("tblName", analysisJobInfo.tblName); - params.put("colName", analysisJobInfo.colName); - params.put("jobType", analysisJobInfo.jobType.toString()); - params.put("analysisType", analysisJobInfo.analysisType.toString()); - params.put("message", ""); - params.put("lastExecTimeInMs", "0"); - params.put("state", JobState.PENDING.toString()); - params.put("scheduleType", analysisJobInfo.scheduleType.toString()); - try { - StatisticsUtil.execUpdate( - new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE)); - } catch (Exception e) { - LOG.warn("Failed to persite job for column: {}", colName, e); - return; - } - Env.getCurrentEnv().getAnalysisJobScheduler().schedule(analysisJobInfo); - } - } + public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception { + Map params = new HashMap<>(); + params.put("jobId", String.valueOf(analysisTaskInfo.jobId)); + params.put("taskId", String.valueOf(analysisTaskInfo.taskId)); + params.put("catalogName", analysisTaskInfo.catalogName); + params.put("dbName", analysisTaskInfo.dbName); + params.put("tblName", analysisTaskInfo.tblName); + params.put("colName", analysisTaskInfo.colName); + params.put("indexId", String.valueOf(analysisTaskInfo.indexId)); + params.put("jobType", analysisTaskInfo.jobType.toString()); + params.put("analysisType", analysisTaskInfo.analysisMethod.toString()); + params.put("message", ""); + params.put("lastExecTimeInMs", "0"); + params.put("state", AnalysisState.PENDING.toString()); + params.put("scheduleType", analysisTaskInfo.scheduleType.toString()); + StatisticsUtil.execUpdate( + new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE)); } public static void alterColumnStatistics(AlterColumnStatsStmt alterColumnStatsStmt) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index baa5ff1395..16a4a66ffc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics.util; - import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DateLiteral; @@ -46,7 +45,7 @@ import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisTaskInfo; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; @@ -90,13 +89,11 @@ public class StatisticsUtil { StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); stmtExecutor.execute(); - } finally { - ConnectContext.remove(); } } // TODO: finish this. - public static List deserializeToAnalysisJob(List resultBatches) throws TException { + public static List deserializeToAnalysisJob(List resultBatches) throws TException { return new ArrayList<>(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java index 019d4d0d90..9e0b8c2a46 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java @@ -41,7 +41,7 @@ public class SqlModeTest { } catch (Exception e) { Assert.fail(e.getMessage()); } - Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); + Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_DEFAULT)); try { @@ -49,7 +49,7 @@ public class SqlModeTest { } catch (Exception e) { Assert.fail(e.getMessage()); } - Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); + Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index e2cc43a617..658b9d8ba1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -28,7 +28,6 @@ import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -96,7 +95,6 @@ public class TabletRepairAndBalanceTest { static { try { - InternalSchemaInitializer.forTest = true; tag1 = Tag.create(Tag.TYPE_LOCATION, "zone1"); tag2 = Tag.create(Tag.TYPE_LOCATION, "zone2"); } catch (AnalysisException e) { @@ -106,6 +104,7 @@ public class TabletRepairAndBalanceTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index f7b5e0eda3..8d90e1a82e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; @@ -72,7 +71,7 @@ public class TabletReplicaTooSlowTest { @BeforeClass public static void beforeClass() throws Exception { - InternalSchemaInitializer.forTest = true; + FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 25960fa5d4..95fb22eac6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -27,6 +27,7 @@ import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.List; @@ -38,6 +39,16 @@ public class DecommissionBackendTest extends TestWithFeService { return 3; } + @Override + protected void beforeCluster() { + FeConstants.runningUnitTest = true; + } + + @BeforeAll + public void beforeClass() { + FeConstants.runningUnitTest = true; + } + @Override protected void beforeCreatingConnectContext() throws Exception { FeConstants.default_scheduler_interval_millisecond = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java index 691da60173..0c43ee6338 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java @@ -17,9 +17,18 @@ package org.apache.doris.nereids.datasets.ssb; +import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; +import org.junit.jupiter.api.BeforeAll; + public abstract class SSBTestBase extends AnalyzeCheckTestBase { + + @BeforeAll + public void beforeClass() { + FeConstants.runningUnitTest = true; + } + @Override protected void runBeforeAll() throws Exception { createDatabase("test"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java deleted file mode 100644 index e16f416368..0000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; -import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; -import org.apache.doris.statistics.util.BlockingCounter; -import org.apache.doris.utframe.TestWithFeService; - -import mockit.Expectations; -import mockit.Mocked; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.BlockingQueue; - -public class AnalysisJobExecutorTest extends TestWithFeService { - - @Mocked - AnalysisJobScheduler analysisJobScheduler; - - @Override - protected void runBeforeAll() throws Exception { - try { - InternalSchemaInitializer.createDB(); - createDatabase("analysis_job_test"); - connectContext.setDatabase("default_cluster:analysis_job_test"); - createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" - - + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" - + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" - + ");"); - InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer(); - Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Test - public void testExpiredJobCancellation() throws Exception { - AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler); - BlockingQueue b = Deencapsulation.getField(analysisJobExecutor, "jobQueue"); - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - AnalysisJob analysisJob = new AnalysisJob(analysisJobScheduler, analysisJobInfo); - AnalysisJobWrapper analysisJobWrapper = new AnalysisJobWrapper(analysisJobExecutor, analysisJob); - Deencapsulation.setField(analysisJobWrapper, "startTime", 5); - b.put(analysisJobWrapper); - new Expectations() { - { - analysisJobWrapper.cancel(); - times = 1; - } - }; - analysisJobExecutor.start(); - BlockingCounter counter = Deencapsulation.getField(analysisJobExecutor, "blockingCounter"); - Assertions.assertEquals(0, counter.getVal()); - } - - @Test - public void testJobExecution() throws Exception { - AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler); - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - AnalysisJob job = new AnalysisJob(analysisJobScheduler, analysisJobInfo); - new Expectations() { - { - analysisJobScheduler.getPendingJobs(); - result = job; - job.execute(); - times = 1; - } - }; - Deencapsulation.invoke(analysisJobExecutor, "doFetchAndExecute"); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index dfe7617b60..a01dd8f7cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -22,8 +22,9 @@ import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.utframe.TestWithFeService; @@ -54,10 +55,10 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testCreateAnalysisJob(@Mocked AnalysisJobScheduler scheduler) throws Exception { + public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) throws Exception { new Expectations() { { - scheduler.schedule((AnalysisJobInfo) any); + scheduler.schedule((AnalysisTaskInfo) any); times = 3; } }; @@ -86,7 +87,7 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testJobExecution(@Mocked AnalysisJobScheduler scheduler, @Mocked StmtExecutor stmtExecutor) + public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, @Mocked StmtExecutor stmtExecutor) throws Exception { new MockUp() { @@ -105,13 +106,12 @@ public class AnalysisJobTest extends TestWithFeService { times = 2; } }; - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - new AnalysisJob(scheduler, analysisJobInfo).execute(); + AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + new OlapAnalysisTask(scheduler, analysisJobInfo).execute(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java new file mode 100644 index 0000000000..f9fcbaf55f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.util.BlockingCounter; +import org.apache.doris.utframe.TestWithFeService; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; + +public class AnalysisTaskExecutorTest extends TestWithFeService { + + @Mocked + AnalysisTaskScheduler analysisTaskScheduler; + + @Override + protected void runBeforeAll() throws Exception { + try { + InternalSchemaInitializer.createDB(); + createDatabase("analysis_job_test"); + connectContext.setDatabase("default_cluster:analysis_job_test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" + + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" + + ");"); + InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer(); + Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testExpiredJobCancellation() throws Exception { + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + BlockingQueue b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue"); + AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisTaskScheduler, analysisJobInfo); + AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob); + Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); + b.put(analysisTaskWrapper); + new Expectations() { + { + analysisTaskWrapper.cancel(); + times = 1; + } + }; + analysisTaskExecutor.start(); + BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter"); + Assertions.assertEquals(0, counter.getVal()); + } + + @Test + public void testTaskExecution() throws Exception { + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskScheduler, analysisTaskInfo); + new MockUp() { + @Mock + public synchronized BaseAnalysisTask getPendingTasks() { + return task; + } + }; + new MockUp() { + @Mock + public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {} + }; + new Expectations() { + { + task.execute(); + times = 1; + } + }; + Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java new file mode 100644 index 0000000000..7188342b83 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.utframe.TestWithFeService; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Tested; +import org.junit.jupiter.api.Test; + +public class MVStatisticsTest extends TestWithFeService { + + @Injectable + StatisticsCache statisticsCache; + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ":" + "test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + "DISTRIBUTED BY HASH(col3)\n" + + "BUCKETS 1\n" + + "PROPERTIES(\n" + + " \"replication_num\"=\"1\"\n" + + ");\n"); + createMv("CREATE MATERIALIZED VIEW mv1 AS SELECT col3 , SUM(COL2) FROM t1 group by col3"); + } + + @Tested + + @Test + public void testCreate() throws Exception { + new Expectations() { + { + statisticsCache.refreshSync(anyLong, anyString); + times = 5; + } + }; + new MockUp() { + }; + new MockUp() { + + @Mock + public void execUpdate(String sql) throws Exception {} + }; + new MockUp(OlapAnalysisTask.class) { + + @Mock + public void execSQL(String sql) throws Exception {} + }; + new MockUp() { + + @Mock + public StatisticsCache getStatisticsCache() { + return statisticsCache; + } + }; + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + Deencapsulation.setField(analysisManager, "statisticsCache", statisticsCache); + getSqlStmtExecutor("analyze t1"); + Thread.sleep(3000); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 90b0e4658d..5ecce69719 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -41,7 +41,6 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; @@ -121,13 +120,16 @@ public abstract class TestWithFeService { @BeforeAll public final void beforeAll() throws Exception { - InternalSchemaInitializer.forTest = true; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); + beforeCluster(); createDorisCluster(); runBeforeAll(); } + protected void beforeCluster() { + } + @AfterAll public final void afterAll() throws Exception { runAfterAll();