From e5000c708edb212bd747806bdfa79c6325840a12 Mon Sep 17 00:00:00 2001 From: Kikyou1997 <33112463+Kikyou1997@users.noreply.github.com> Date: Thu, 1 Dec 2022 22:34:13 +0800 Subject: [PATCH] [feature](statistics) Support for collecting statistics on materialized view (#14676) 1. Map muiltiple tasks to one Job 2. Remove the codes for analyzing whole default db, since this feature is not available and would create too many tasks and related code is confusing 3. support analyze materialized view 4. abstract the common logic to BaseTask --- fe/fe-core/src/main/cup/sql_parser.cup | 2 +- .../apache/doris/analysis/AnalyzeStmt.java | 252 +++++------------- .../apache/doris/analysis/SelectListItem.java | 3 + .../org/apache/doris/analysis/SelectStmt.java | 28 +- .../org/apache/doris/analysis/TableRef.java | 16 ++ .../java/org/apache/doris/catalog/Env.java | 24 +- .../catalog/InternalSchemaInitializer.java | 23 +- .../org/apache/doris/catalog/OlapTable.java | 17 +- .../java/org/apache/doris/catalog/Table.java | 8 +- .../org/apache/doris/catalog/TableIf.java | 8 +- .../doris/catalog/external/ExternalTable.java | 8 +- .../catalog/external/HMSExternalTable.java | 16 +- .../org/apache/doris/qe/StmtExecutor.java | 6 +- .../apache/doris/statistics/AnalysisJob.java | 207 -------------- .../doris/statistics/AnalysisManager.java | 145 ++++++++++ .../doris/statistics/AnalysisState.java | 25 ++ ...xecutor.java => AnalysisTaskExecutor.java} | 34 +-- ...ysisJobInfo.java => AnalysisTaskInfo.java} | 72 +++-- .../statistics/AnalysisTaskInfoBuilder.java | 115 ++++++++ ...eduler.java => AnalysisTaskScheduler.java} | 67 ++--- ...bWrapper.java => AnalysisTaskWrapper.java} | 38 +-- .../doris/statistics/BaseAnalysisTask.java | 160 +++++++++++ ...SAnalysisJob.java => HMSAnalysisTask.java} | 6 +- ...AnalysisJob.java => HiveAnalysisTask.java} | 8 +- ...lysisJob.java => IcebergAnalysisTask.java} | 6 +- .../doris/statistics/MVAnalysisTask.java | 144 ++++++++++ .../doris/statistics/OlapAnalysisTask.java | 110 ++++++++ .../doris/statistics/StatisticsJob.java | 16 -- .../statistics/StatisticsJobManager.java | 35 --- .../statistics/StatisticsRepository.java | 59 ++-- .../doris/statistics/util/StatisticsUtil.java | 7 +- .../apache/doris/analysis/SqlModeTest.java | 4 +- .../clone/TabletRepairAndBalanceTest.java | 3 +- .../doris/clone/TabletReplicaTooSlowTest.java | 3 +- .../cluster/DecommissionBackendTest.java | 11 + .../nereids/datasets/ssb/SSBTestBase.java | 9 + .../statistics/AnalysisJobExecutorTest.java | 103 ------- .../doris/statistics/AnalysisJobTest.java | 24 +- .../statistics/AnalysisTaskExecutorTest.java | 112 ++++++++ .../doris/statistics/MVStatisticsTest.java | 85 ++++++ .../doris/utframe/TestWithFeService.java | 6 +- 41 files changed, 1230 insertions(+), 795 deletions(-) delete mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java rename fe/fe-core/src/main/java/org/apache/doris/statistics/{AnalysisJobExecutor.java => AnalysisTaskExecutor.java} (75%) rename fe/fe-core/src/main/java/org/apache/doris/statistics/{AnalysisJobInfo.java => AnalysisTaskInfo.java} (65%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java rename fe/fe-core/src/main/java/org/apache/doris/statistics/{AnalysisJobScheduler.java => AnalysisTaskScheduler.java} (54%) rename fe/fe-core/src/main/java/org/apache/doris/statistics/{AnalysisJobWrapper.java => AnalysisTaskWrapper.java} (63%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java rename fe/fe-core/src/main/java/org/apache/doris/statistics/{HMSAnalysisJob.java => HMSAnalysisTask.java} (89%) rename fe/fe-core/src/main/java/org/apache/doris/statistics/{HiveAnalysisJob.java => HiveAnalysisTask.java} (97%) rename fe/fe-core/src/main/java/org/apache/doris/statistics/{IcebergAnalysisJob.java => IcebergAnalysisTask.java} (96%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java delete mode 100644 fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java create mode 100644 fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index 68d209ffe6..0735311f09 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -2619,7 +2619,7 @@ show_create_routine_load_stmt ::= // analyze statment analyze_stmt ::= - KW_ANALYZE opt_table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties + KW_ANALYZE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties {: RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties); :} diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java index f99e172c56..f5ea7fabb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AnalyzeStmt.java @@ -23,7 +23,6 @@ import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; @@ -39,14 +38,12 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -73,198 +70,69 @@ public class AnalyzeStmt extends DdlStmt { private static final Predicate DESIRED_TASK_TIMEOUT_SEC = (v) -> v > 0L; - private final TableName optTableName; + public final boolean wholeTbl; + + private final TableName tableName; + + private TableIf table; + private final PartitionNames optPartitionNames; private List optColumnNames; private Map optProperties; // after analyzed private long dbId; - private final Set tblIds = Sets.newHashSet(); + private final List partitionNames = Lists.newArrayList(); - // TODO(wzt): support multiple tables - public AnalyzeStmt(TableName optTableName, + public AnalyzeStmt(TableName tableName, List optColumnNames, PartitionNames optPartitionNames, Map optProperties) { - this.optTableName = optTableName; + this.tableName = tableName; this.optColumnNames = optColumnNames; this.optPartitionNames = optPartitionNames; + wholeTbl = CollectionUtils.isEmpty(optColumnNames); this.optProperties = optProperties; } - public long getDbId() { - Preconditions.checkArgument(isAnalyzed(), - "The dbId must be obtained after the parsing is complete"); - return dbId; - } - - public Set getTblIds() { - Preconditions.checkArgument(isAnalyzed(), - "The tblIds must be obtained after the parsing is complete"); - return tblIds; - } - - public Database getDb() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The db must be obtained after the parsing is complete"); - return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId); - } - - public List getTables() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The tables must be obtained after the parsing is complete"); - Database db = getDb(); - List
tables = Lists.newArrayList(); - - db.readLock(); - try { - for (Long tblId : tblIds) { - Table table = db.getTableOrAnalysisException(tblId); - tables.add(table); - } - } finally { - db.readUnlock(); - } - - return tables; - } - - public List getPartitionNames() { - Preconditions.checkArgument(isAnalyzed(), - "The partitionNames must be obtained after the parsing is complete"); - return partitionNames; - } - - /** - * The statistics task obtains partitions and then collects partition statistics, - * we need to filter out partitions that do not have data. - * - * @return map of tableId and partitionName - * @throws AnalysisException not analyzed - */ - public Map> getTableIdToPartitionName() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The partitionIds must be obtained after the parsing is complete"); - Map> tableIdToPartitionName = Maps.newHashMap(); - - for (Table table : getTables()) { - table.readLock(); - try { - OlapTable olapTable = (OlapTable) table; - List partitionNames = getPartitionNames(); - List newPartitionNames = new ArrayList<>(partitionNames); - if (newPartitionNames.isEmpty() && olapTable.isPartitioned()) { - newPartitionNames.addAll(olapTable.getPartitionNames()); - } - tableIdToPartitionName.put(table.getId(), newPartitionNames); - } finally { - table.readUnlock(); - } - } - return tableIdToPartitionName; - } - - public Map> getTableIdToColumnName() throws AnalysisException { - Preconditions.checkArgument(isAnalyzed(), - "The db name must be obtained after the parsing is complete"); - Map> tableIdToColumnName = Maps.newHashMap(); - List
tables = getTables(); - if (optColumnNames == null || optColumnNames.isEmpty()) { - for (Table table : tables) { - table.readLock(); - try { - long tblId = table.getId(); - List baseSchema = table.getBaseSchema(); - List colNames = Lists.newArrayList(); - baseSchema.stream().map(Column::getName).forEach(colNames::add); - tableIdToColumnName.put(tblId, colNames); - } finally { - table.readUnlock(); - } - } - } else { - for (Long tblId : tblIds) { - tableIdToColumnName.put(tblId, optColumnNames); - } - } - - return tableIdToColumnName; - } - - public Map getProperties() { - return optProperties; - } - @Override public void analyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); - // step1: analyze db, table and column - if (optTableName != null) { - optTableName.analyze(analyzer); + tableName.analyze(analyzer); - String catalogName = optTableName.getCtl(); - String dbName = optTableName.getDb(); - String tblName = optTableName.getTbl(); - CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName); - DatabaseIf db = catalog.getDbOrAnalysisException(dbName); - TableIf table = db.getTableOrAnalysisException(tblName); + String catalogName = tableName.getCtl(); + String dbName = tableName.getDb(); + String tblName = tableName.getTbl(); + CatalogIf catalog = analyzer.getEnv().getCatalogMgr().getCatalog(catalogName); + DatabaseIf db = catalog.getDbOrAnalysisException(dbName); + table = db.getTableOrAnalysisException(tblName); - checkAnalyzePriv(dbName, tblName); + checkAnalyzePriv(dbName, tblName); - if (optColumnNames != null && !optColumnNames.isEmpty()) { - table.readLock(); - try { - List baseSchema = table.getBaseSchema(false) - .stream().map(Column::getName).collect(Collectors.toList()); - Optional optional = optColumnNames.stream() - .filter(entity -> !baseSchema.contains(entity)).findFirst(); - if (optional.isPresent()) { - String columnName = optional.get(); - ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, - columnName, FeNameFormat.getColumnNameRegex()); - } - } finally { - table.readUnlock(); - } - } else { - optColumnNames = table.getBaseSchema(false) - .stream().map(Column::getName).collect(Collectors.toList()); - } - - dbId = db.getId(); - tblIds.add(table.getId()); - } else { - // analyze the current default db - String dbName = analyzer.getDefaultDb(); - if (Strings.isNullOrEmpty(dbName)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_NO_DB_ERROR); - } - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName); - - db.readLock(); + if (optColumnNames != null && !optColumnNames.isEmpty()) { + table.readLock(); try { - List
tables = db.getTables(); - for (Table table : tables) { - checkAnalyzeType(table); - checkAnalyzePriv(dbName, table.getName()); - } - - dbId = db.getId(); - for (Table table : tables) { - long tblId = table.getId(); - tblIds.add(tblId); + List baseSchema = table.getBaseSchema(false) + .stream().map(Column::getName).collect(Collectors.toList()); + Optional optional = optColumnNames.stream() + .filter(entity -> !baseSchema.contains(entity)).findFirst(); + if (optional.isPresent()) { + String columnName = optional.get(); + ErrorReport.reportAnalysisException(ErrorCode.ERR_WRONG_COLUMN_NAME, + columnName, FeNameFormat.getColumnNameRegex()); } } finally { - db.readUnlock(); + table.readUnlock(); } + } else { + optColumnNames = table.getBaseSchema(false) + .stream().map(Column::getName).collect(Collectors.toList()); } - + dbId = db.getId(); // step2: analyze partition checkPartitionNames(); - // step3: analyze properties checkProperties(); } @@ -286,18 +154,12 @@ public class AnalyzeStmt extends DdlStmt { } } - private void checkAnalyzeType(Table table) throws AnalysisException { - if (table.getType() != Table.TableType.OLAP) { - throw new AnalysisException("Only OLAP table statistics are supported"); - } - } - private void checkPartitionNames() throws AnalysisException { if (optPartitionNames != null) { optPartitionNames.analyze(analyzer); - if (optTableName != null) { - Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(optTableName.getDb()); - OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(optTableName.getTbl()); + if (tableName != null) { + Database db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tableName.getDb()); + OlapTable olapTable = (OlapTable) db.getTableOrAnalysisException(tableName.getTbl()); if (!olapTable.isPartitioned()) { throw new AnalysisException("Not a partitioned table: " + olapTable.getName()); } @@ -340,9 +202,9 @@ public class AnalyzeStmt extends DdlStmt { StringBuilder sb = new StringBuilder(); sb.append("ANALYZE"); - if (optTableName != null) { + if (tableName != null) { sb.append(" "); - sb.append(optTableName.toSql()); + sb.append(tableName.toSql()); } if (optColumnNames != null) { @@ -369,18 +231,46 @@ public class AnalyzeStmt extends DdlStmt { } public String getCatalogName() { - return optTableName.getCtl(); + return tableName.getCtl(); } public String getDBName() { - return optTableName.getDb(); + return tableName.getDb(); } - public String getTblName() { - return optTableName.getTbl(); + public TableName getTblName() { + return tableName; } public List getOptColumnNames() { return optColumnNames; } + + + public long getDbId() { + Preconditions.checkArgument(isAnalyzed(), + "The dbId must be obtained after the parsing is complete"); + return dbId; + } + + public Database getDb() throws AnalysisException { + Preconditions.checkArgument(isAnalyzed(), + "The db must be obtained after the parsing is complete"); + return analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbId); + } + + public TableIf getTable() { + return table; + } + + public List getPartitionNames() { + Preconditions.checkArgument(isAnalyzed(), + "The partitionNames must be obtained after the parsing is complete"); + return partitionNames; + } + + public Map getProperties() { + return optProperties; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java index 643da0095a..b757619867 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectListItem.java @@ -132,4 +132,7 @@ public class SelectListItem { return expr.toColumnLabel(); } + public void setAlias(String alias) { + this.alias = alias; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java index 3f754595f1..b03ec0a691 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java @@ -1833,18 +1833,26 @@ public class SelectStmt extends QueryStmt { if (selectList.isDistinct()) { strBuilder.append("DISTINCT "); } - for (int i = 0; i < resultExprs.size(); ++i) { - // strBuilder.append(selectList.getItems().get(i).toSql()); - // strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); - if (i != 0) { - strBuilder.append(", "); + ConnectContext ctx = ConnectContext.get(); + if (ctx == null || ctx.getSessionVariable().internalSession) { + for (int i = 0; i < selectList.getItems().size(); i++) { + strBuilder.append(selectList.getItems().get(i).toSql()); + strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); } - if (needToSql) { - strBuilder.append(originalExpr.get(i).toSql()); - } else { - strBuilder.append(resultExprs.get(i).toSql()); + } else { + for (int i = 0; i < resultExprs.size(); ++i) { + // strBuilder.append(selectList.getItems().get(i).toSql()); + // strBuilder.append((i + 1 != selectList.getItems().size()) ? ", " : ""); + if (i != 0) { + strBuilder.append(", "); + } + if (needToSql) { + strBuilder.append(originalExpr.get(i).toSql()); + } else { + strBuilder.append(resultExprs.get(i).toSql()); + } + strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i))); } - strBuilder.append(" AS ").append(SqlUtils.getIdentSql(colLabels.get(i))); } // From clause diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java index 0aa5d59291..f5c37e5fd2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.StringJoiner; /** * Superclass of all table references, including references to views, base tables @@ -226,6 +227,13 @@ public class TableRef implements ParseNode, Writable { output.append("[").append(Joiner.on(", ").join(joinHints)).append("] "); } output.append(tableRefToSql()).append(" "); + if (partitionNames != null) { + StringJoiner sj = new StringJoiner(",", "", " "); + for (String partName : partitionNames.getPartitionNames()) { + sj.add(partName); + } + output.append(sj.toString()); + } if (usingColNames != null) { output.append("USING (").append(Joiner.on(", ").join(usingColNames)).append(")"); } else if (onClause != null) { @@ -864,4 +872,12 @@ public class TableRef implements ParseNode, Writable { aliases = new String[]{alias}; } } + + public void setPartitionNames(PartitionNames partitionNames) { + this.partitionNames = partitionNames; + } + + public void setName(TableName name) { + this.name = name; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index f57f262526..e29713731f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -208,12 +208,12 @@ import org.apache.doris.qe.JournalObservable; import org.apache.doris.qe.VariableMgr; import org.apache.doris.resource.Tag; import org.apache.doris.service.FrontendOptions; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisManager; +import org.apache.doris.statistics.AnalysisTaskScheduler; import org.apache.doris.statistics.StatisticsCache; import org.apache.doris.statistics.StatisticsJobManager; import org.apache.doris.statistics.StatisticsJobScheduler; import org.apache.doris.statistics.StatisticsManager; -import org.apache.doris.statistics.StatisticsRepository; import org.apache.doris.statistics.StatisticsTaskScheduler; import org.apache.doris.system.Backend; import org.apache.doris.system.FQDNManager; @@ -441,9 +441,7 @@ public class Env { private MTMVJobManager mtmvJobManager; - private final AnalysisJobScheduler analysisJobScheduler; - - private final StatisticsCache statisticsCache; + private AnalysisManager analysisManager; private ExternalMetaCacheMgr extMetaCacheMgr; @@ -647,10 +645,9 @@ public class Env { this.refreshManager = new RefreshManager(); this.policyMgr = new PolicyMgr(); this.mtmvJobManager = new MTMVJobManager(); - this.analysisJobScheduler = new AnalysisJobScheduler(); - this.statisticsCache = new StatisticsCache(); this.extMetaCacheMgr = new ExternalMetaCacheMgr(); this.fqdnManager = new FQDNManager(systemInfo); + this.analysisManager = new AnalysisManager(); } public static void destroyCheckpoint() { @@ -1653,7 +1650,7 @@ public class Env { } public StatisticsCache getStatisticsCache() { - return statisticsCache; + return analysisManager.getStatisticsCache(); } public boolean hasReplayer() { @@ -5233,8 +5230,8 @@ public class Env { return count; } - public AnalysisJobScheduler getAnalysisJobScheduler() { - return analysisJobScheduler; + public AnalysisTaskScheduler getAnalysisJobScheduler() { + return analysisManager.taskScheduler; } // TODO: @@ -5242,6 +5239,11 @@ public class Env { // 2. support sample job // 3. support period job public void createAnalysisJob(AnalyzeStmt analyzeStmt) { - StatisticsRepository.createAnalysisJob(analyzeStmt); + analysisManager.createAnalysisJob(analyzeStmt); } + + public AnalysisManager getAnalysisManager() { + return analysisManager; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java index 77294b3b76..8cafd7d777 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchemaInitializer.java @@ -44,24 +44,22 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; public class InternalSchemaInitializer extends Thread { private static final Logger LOG = LogManager.getLogger(InternalSchemaInitializer.class); - public static boolean forTest = false; - /** * If internal table creation failed, will retry after below seconds. */ public static final int TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS = 1; - public void run() { - if (forTest) { + if (FeConstants.runningUnitTest) { return; } - while (true) { + while (!created()) { FrontendNodeType feType = Env.getCurrentEnv().getFeType(); if (feType.equals(FrontendNodeType.INIT) || feType.equals(FrontendNodeType.UNKNOWN)) { LOG.warn("FE is not ready"); @@ -72,7 +70,6 @@ public class InternalSchemaInitializer extends Thread { .join(TABLE_CREATION_RETRY_INTERVAL_IN_SECONDS * 1000L); createDB(); createTbl(); - break; } catch (Throwable e) { LOG.warn("Statistics storage initiated failed, will try again later", e); } @@ -145,10 +142,12 @@ public class InternalSchemaInitializer extends Thread { FeConstants.INTERNAL_DB_NAME, StatisticConstants.ANALYSIS_JOB_TABLE); List columnDefs = new ArrayList<>(); columnDefs.add(new ColumnDef("job_id", TypeDef.create(PrimitiveType.BIGINT))); + columnDefs.add(new ColumnDef("task_id", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("catalog_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("db_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("tbl_name", TypeDef.createVarchar(1024))); columnDefs.add(new ColumnDef("col_name", TypeDef.createVarchar(1024))); + columnDefs.add(new ColumnDef("index_id", TypeDef.create(PrimitiveType.BIGINT))); columnDefs.add(new ColumnDef("job_type", TypeDef.createVarchar(32))); columnDefs.add(new ColumnDef("analysis_type", TypeDef.createVarchar(32))); columnDefs.add(new ColumnDef("message", TypeDef.createVarchar(1024))); @@ -175,4 +174,16 @@ public class InternalSchemaInitializer extends Thread { return createTableStmt; } + private boolean created() { + Optional optionalDatabase = + Env.getCurrentEnv().getInternalCatalog() + .getDb(SystemInfoService.DEFAULT_CLUSTER + ":" + FeConstants.INTERNAL_DB_NAME); + if (!optionalDatabase.isPresent()) { + return false; + } + Database db = optionalDatabase.get(); + return db.getTable(StatisticConstants.STATISTIC_TBL_NAME).isPresent() + && db.getTable(StatisticConstants.ANALYSIS_JOB_TABLE).isPresent(); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 91cd500ddf..c43aef26eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -47,9 +47,12 @@ import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.qe.OriginStatement; import org.apache.doris.resource.Tag; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.MVAnalysisTask; +import org.apache.doris.statistics.OlapAnalysisTask; import org.apache.doris.system.Backend; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TCompressionType; @@ -997,8 +1000,11 @@ public class OlapTable extends Table { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { - return new AnalysisJob(scheduler, info); + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { + if (info.analysisType.equals(AnalysisType.COLUMN)) { + return new OlapAnalysisTask(scheduler, info); + } + return new MVAnalysisTask(scheduler, info); } @Override @@ -1939,4 +1945,5 @@ public class OlapTable extends Table { public Set getPartitionKeys() { return idToPartition.keySet(); } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java index 24c33bd837..56f02f9832 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Table.java @@ -26,9 +26,9 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.common.util.SqlUtils; import org.apache.doris.common.util.Util; import org.apache.doris.external.hudi.HudiTable; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import com.google.common.base.Preconditions; @@ -508,7 +508,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { throw new NotImplementedException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 361ef44ba2..6aebe1feb3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -20,9 +20,9 @@ package org.apache.doris.catalog; import org.apache.doris.alter.AlterCancelException; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import java.util.Collections; @@ -111,7 +111,7 @@ public interface TableIf { TTableDescriptor toThrift(); - AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info); + BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info); /** * Doris table type. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java index eeca74ff5c..c4cb6aad00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalTable.java @@ -29,9 +29,9 @@ import org.apache.doris.datasource.ExternalCatalog; import org.apache.doris.datasource.ExternalSchemaCache; import org.apache.doris.persist.gson.GsonPostProcessable; import org.apache.doris.persist.gson.GsonUtils; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; import org.apache.doris.thrift.TTableDescriptor; import com.google.gson.annotations.SerializedName; @@ -301,7 +301,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { throw new NotImplementedException(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java index bfefe1f829..d3e991146f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java @@ -22,11 +22,11 @@ import org.apache.doris.catalog.Type; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.datasource.HMSExternalCatalog; import org.apache.doris.datasource.PooledHiveMetaStoreClient; -import org.apache.doris.statistics.AnalysisJob; -import org.apache.doris.statistics.AnalysisJobInfo; -import org.apache.doris.statistics.AnalysisJobScheduler; -import org.apache.doris.statistics.HiveAnalysisJob; -import org.apache.doris.statistics.IcebergAnalysisJob; +import org.apache.doris.statistics.AnalysisTaskInfo; +import org.apache.doris.statistics.AnalysisTaskScheduler; +import org.apache.doris.statistics.BaseAnalysisTask; +import org.apache.doris.statistics.HiveAnalysisTask; +import org.apache.doris.statistics.IcebergAnalysisTask; import org.apache.doris.thrift.THiveTable; import org.apache.doris.thrift.TTableDescriptor; import org.apache.doris.thrift.TTableType; @@ -268,13 +268,13 @@ public class HMSExternalTable extends ExternalTable { } @Override - public AnalysisJob createAnalysisJob(AnalysisJobScheduler scheduler, AnalysisJobInfo info) { + public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) { makeSureInitialized(); switch (dlaType) { case HIVE: - return new HiveAnalysisJob(scheduler, info); + return new HiveAnalysisTask(scheduler, info); case ICEBERG: - return new IcebergAnalysisJob(scheduler, info); + return new IcebergAnalysisTask(scheduler, info); default: throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported."); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7a83d8db8d..38503d1263 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1791,12 +1791,13 @@ public class StmtExecutor implements ProfileWriter { public List executeInternalQuery() { try { + List resultRows = new ArrayList<>(); analyzer = new Analyzer(context.getEnv(), context); try { analyze(context.getSessionVariable().toThrift()); } catch (UserException e) { LOG.warn("Internal SQL execution failed, SQL: {}", originStmt, e); - return null; + return resultRows; } planner.getFragments(); RowBatch batch; @@ -1821,7 +1822,6 @@ public class StmtExecutor implements ProfileWriter { } Span fetchResultSpan = context.getTracer().spanBuilder("fetch internal SQL result") .setParent(Context.current()).startSpan(); - List resultRows = new ArrayList<>(); try (Scope scope = fetchResultSpan.makeCurrent()) { while (true) { batch = coord.getNext(); @@ -1834,7 +1834,7 @@ public class StmtExecutor implements ProfileWriter { } catch (Exception e) { LOG.warn("Unexpected exception when SQL running", e); fetchResultSpan.recordException(e); - return null; + return resultRows; } finally { fetchResultSpan.end(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java deleted file mode 100644 index 06fb36e644..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJob.java +++ /dev/null @@ -1,207 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.DatabaseIf; -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.Partition; -import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.FeConstants; -import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.qe.AutoCloseConnectContext; -import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.util.StatisticsUtil; - -import org.apache.commons.text.StringSubstitutor; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class AnalysisJob { - - private final AnalysisJobScheduler analysisJobScheduler; - - protected final AnalysisJobInfo info; - - protected CatalogIf catalog; - - protected DatabaseIf db; - - protected TableIf tbl; - - protected Column col; - - protected StmtExecutor stmtExecutor; - - public AnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - this.analysisJobScheduler = analysisJobScheduler; - this.info = info; - init(info); - } - - private void init(AnalysisJobInfo info) { - catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName); - if (catalog == null) { - analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED, - String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis()); - return; - } - db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null); - if (db == null) { - analysisJobScheduler.updateJobStatus(info.jobId, JobState.FAILED, - String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis()); - return; - } - tbl = (TableIf) db.getTable(info.tblName).orElse(null); - if (tbl == null) { - analysisJobScheduler.updateJobStatus( - info.jobId, JobState.FAILED, - String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis()); - } - col = tbl.getColumn(info.colName); - if (col == null) { - analysisJobScheduler.updateJobStatus( - info.jobId, JobState.FAILED, String.format("Column with name %s not exists", info.tblName), - System.currentTimeMillis()); - } - } - - private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT " - + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, " - + "${catalogId} AS catalog_id, " - + "${dbId} AS db_id, " - + "${tblId} AS tbl_id, " - + "'${colId}' AS col_id, " - + "${partId} AS part_id, " - + "COUNT(1) AS row_count, " - + "NDV(${colName}) AS ndv, " - + "SUM(CASE WHEN ${colName} IS NULL THEN 1 ELSE 0 END) AS null_count, " - + "MIN(${colName}) AS min, " - + "MAX(${colName}) AS max, " - + "${dataSizeFunction} AS data_size, " - + "NOW()" - + "FROM `${dbName}`.`${tblName}` " - + "PARTITION ${partName}"; - - private static final String ANALYZE_COLUMN_SQL_TEMPLATE = "INSERT INTO " - + "${internalDB}.${columnStatTbl}" - + " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, " - + " ndv, null_count, min, max, data_size, update_time\n" - + " FROM \n" - + " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, " - + " ${catalogId} AS catalog_id, " - + " ${dbId} AS db_id, " - + " ${tblId} AS tbl_id, " - + " '${colId}' AS col_id, " - + " NULL AS part_id, " - + " SUM(count) AS row_count, \n" - + " SUM(null_count) AS null_count, " - + " MIN(CAST(min AS ${type})) AS min, " - + " MAX(CAST(max AS ${type})) AS max, " - + " SUM(data_size_in_bytes) AS data_size, " - + " NOW() AS update_time\n" - + " FROM ${internalDB}.${columnStatTbl}" - + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " - + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " - + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " - + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" - + " ) t1, \n" - + " (SELECT NDV(${colName}) AS ndv FROM `${dbName}`.`${tblName}`) t2\n"; - - private String getDataSizeFunction() { - if (col.getType().isStringType()) { - return "SUM(LENGTH(${colName}))"; - } - return "COUNT(1) * " + col.getType().getSlotSize(); - } - - public void execute() throws Exception { - Map params = new HashMap<>(); - params.put("internalDB", FeConstants.INTERNAL_DB_NAME); - params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); - params.put("catalogId", String.valueOf(catalog.getId())); - params.put("dbId", String.valueOf(db.getId())); - params.put("tblId", String.valueOf(tbl.getId())); - params.put("colId", String.valueOf(info.colName)); - params.put("dataSizeFunction", getDataSizeFunction()); - params.put("dbName", info.dbName); - params.put("colName", String.valueOf(info.colName)); - params.put("tblName", String.valueOf(info.tblName)); - List partitionAnalysisSQLs = new ArrayList<>(); - try { - tbl.readLock(); - Set partNames = ((Table) tbl).getPartitionNames(); - for (String partName : partNames) { - Partition part = ((Table) tbl).getPartition(partName); - if (part == null) { - continue; - } - params.put("partId", String.valueOf(((Table) tbl).getPartition(partName).getId())); - params.put("partName", String.valueOf(partName)); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); - } - } finally { - tbl.readUnlock(); - } - for (String sql : partitionAnalysisSQLs) { - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - } - } - params.remove("partId"); - params.put("type", col.getType().toString()); - StringSubstitutor stringSubstitutor = new StringSubstitutor(params); - String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); - try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { - this.stmtExecutor = new StmtExecutor(r.connectContext, sql); - this.stmtExecutor.execute(); - Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); - } - } - - public int getLastExecTime() { - return info.lastExecTimeInMs; - } - - public void cancel() { - if (stmtExecutor != null) { - stmtExecutor.cancel(); - } - analysisJobScheduler - .updateJobStatus(info.jobId, JobState.FAILED, - String.format("Job has been cancelled: %s", info.toString()), -1); - } - - public void updateState(JobState jobState) { - info.updateState(jobState); - } - - public long getJobId() { - return info.jobId; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java new file mode 100644 index 0000000000..3833acc20f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.AnalyzeStmt; +import org.apache.doris.analysis.TableName; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.TableIf.TableType; +import org.apache.doris.common.FeConstants; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; +import org.apache.doris.statistics.util.StatisticsUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.text.StringSubstitutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +public class AnalysisManager { + + private static final Logger LOG = LogManager.getLogger(AnalysisManager.class); + + private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " + + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " + + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}"; + + private final ConcurrentMap> analysisJobIdToTaskMap; + + public final AnalysisTaskScheduler taskScheduler; + + private StatisticsCache statisticsCache; + + private final AnalysisTaskExecutor taskExecutor; + + public AnalysisManager() { + analysisJobIdToTaskMap = new ConcurrentHashMap<>(); + this.taskScheduler = new AnalysisTaskScheduler(); + taskExecutor = new AnalysisTaskExecutor(taskScheduler); + this.statisticsCache = new StatisticsCache(); + taskExecutor.start(); + } + + public void createAnalysisJob(AnalyzeStmt analyzeStmt) { + String catalogName = analyzeStmt.getCatalogName(); + String db = analyzeStmt.getDBName(); + TableName tbl = analyzeStmt.getTblName(); + StatisticsUtil.convertTableNameToObjects(tbl); + List colNames = analyzeStmt.getOptColumnNames(); + Map analysisTaskInfos = new HashMap<>(); + long jobId = Env.getCurrentEnv().getNextId(); + if (colNames != null) { + for (String colName : colNames) { + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId) + .setTaskId(taskId).setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setColName(colName).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.COLUMN) + .setScheduleType(ScheduleType.ONCE).build(); + try { + StatisticsRepository.createAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new RuntimeException("Failed to create analysis job", e); + } + analysisTaskInfos.put(taskId, analysisTaskInfo); + } + } + if (analyzeStmt.wholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP)) { + OlapTable olapTable = (OlapTable) analyzeStmt.getTable(); + try { + olapTable.readLock(); + for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) { + if (meta.getDefineStmt() == null) { + continue; + } + long taskId = Env.getCurrentEnv().getNextId(); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId( + jobId).setTaskId(taskId) + .setCatalogName(catalogName).setDbName(db) + .setTblName(tbl.getTbl()).setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL) + .setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX) + .setScheduleType(ScheduleType.ONCE).build(); + try { + StatisticsRepository.createAnalysisTask(analysisTaskInfo); + } catch (Exception e) { + throw new RuntimeException("Failed to create analysis job", e); + } + analysisTaskInfos.put(taskId, analysisTaskInfo); + } + } finally { + olapTable.readUnlock(); + } + } + analysisJobIdToTaskMap.put(jobId, analysisTaskInfos); + analysisTaskInfos.values().forEach(taskScheduler::schedule); + } + + public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) { + Map params = new HashMap<>(); + params.put("jobState", jobState.toString()); + params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : ""); + params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); + params.put("jobId", String.valueOf(info.jobId)); + try { + StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); + } catch (Exception e) { + LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e); + } finally { + info.state = jobState; + if (analysisJobIdToTaskMap.get(info.jobId).values() + .stream().allMatch(i -> i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) { + analysisJobIdToTaskMap.remove(info.jobId); + } + + } + } + + public StatisticsCache getStatisticsCache() { + return statisticsCache; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java new file mode 100644 index 0000000000..bab8a462e8 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisState.java @@ -0,0 +1,25 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +public enum AnalysisState { + PENDING, + RUNNING, + FINISHED, + FAILED; +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java similarity index 75% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java index c73bf349b8..ff98890cf7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskExecutor.java @@ -17,10 +17,10 @@ package org.apache.doris.statistics; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.ThreadPoolManager.BlockedPolicy; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; import org.apache.doris.statistics.util.BlockingCounter; import org.apache.logging.log4j.LogManager; @@ -33,9 +33,9 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -public class AnalysisJobExecutor extends Thread { +public class AnalysisTaskExecutor extends Thread { - private static final Logger LOG = LogManager.getLogger(AnalysisJobExecutor.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class); private final ThreadPoolExecutor executors = ThreadPoolManager.newDaemonThreadPool( Config.statistics_simultaneously_running_job_num, @@ -44,17 +44,17 @@ public class AnalysisJobExecutor extends Thread { new BlockedPolicy("Analysis Job Executor", Integer.MAX_VALUE), "Analysis Job Executor", true); - private final AnalysisJobScheduler jobScheduler; + private final AnalysisTaskScheduler taskScheduler; private final BlockingCounter blockingCounter = new BlockingCounter(Config.statistics_simultaneously_running_job_num); - private final BlockingQueue jobQueue = - new PriorityBlockingQueue(20, - Comparator.comparingLong(AnalysisJobWrapper::getStartTime)); + private final BlockingQueue jobQueue = + new PriorityBlockingQueue(20, + Comparator.comparingLong(AnalysisTaskWrapper::getStartTime)); - public AnalysisJobExecutor(AnalysisJobScheduler jobExecutor) { - this.jobScheduler = jobExecutor; + public AnalysisTaskExecutor(AnalysisTaskScheduler jobExecutor) { + this.taskScheduler = jobExecutor; } @Override @@ -73,12 +73,12 @@ public class AnalysisJobExecutor extends Thread { private void doCancelExpiredJob() { for (;;) { try { - AnalysisJobWrapper jobWrapper = jobQueue.take(); + AnalysisTaskWrapper taskWrapper = jobQueue.take(); try { long timeout = StatisticConstants.STATISTICS_TASKS_TIMEOUT_IN_MS; - jobWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); + taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS); } catch (Exception e) { - jobWrapper.cancel(); + taskWrapper.cancel(); } } catch (Throwable throwable) { LOG.warn(throwable); @@ -101,11 +101,13 @@ public class AnalysisJobExecutor extends Thread { } private void doFetchAndExecute() { - AnalysisJob job = jobScheduler.getPendingJobs(); - AnalysisJobWrapper jobWrapper = new AnalysisJobWrapper(this, job); + BaseAnalysisTask task = taskScheduler.getPendingTasks(); + AnalysisTaskWrapper jobWrapper = new AnalysisTaskWrapper(this, task); incr(); - jobScheduler.updateJobStatus(job.getJobId(), JobState.RUNNING, "", -1); executors.submit(jobWrapper); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.RUNNING, "", System.currentTimeMillis()); } public void decr() { @@ -116,7 +118,7 @@ public class AnalysisJobExecutor extends Thread { blockingCounter.incr(); } - public void putJob(AnalysisJobWrapper wrapper) throws Exception { + public void putJob(AnalysisTaskWrapper wrapper) throws Exception { jobQueue.put(wrapper); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java similarity index 65% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java index da8fa4cede..6c2243b261 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfo.java @@ -20,23 +20,21 @@ package org.apache.doris.statistics; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.Objects; import java.util.StringJoiner; -public class AnalysisJobInfo { +public class AnalysisTaskInfo { - private static final Logger LOG = LogManager.getLogger(AnalysisJobInfo.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskInfo.class); - public enum JobState { - PENDING, - RUNNING, - FINISHED, - FAILED; + + public enum AnalysisMethod { + SAMPLE, + FULL } public enum AnalysisType { - SAMPLE, - FULL; + COLUMN, + INDEX } public enum JobType { @@ -53,6 +51,8 @@ public class AnalysisJobInfo { public final long jobId; + public final long taskId; + public final String catalogName; public final String dbName; @@ -61,56 +61,43 @@ public class AnalysisJobInfo { public final String colName; + public final Long indexId; + public final JobType jobType; - public AnalysisType analysisType; + public final AnalysisMethod analysisMethod; + + public final AnalysisType analysisType; public String message; // finished or failed public int lastExecTimeInMs = 0; - private JobState state; + public AnalysisState state; public final ScheduleType scheduleType; - public AnalysisJobInfo(long jobId, String catalogName, String dbName, String tblName, String colName, - JobType jobType, ScheduleType scheduleType) { + public AnalysisTaskInfo(long jobId, long taskId, String catalogName, String dbName, String tblName, + String colName, Long indexId, JobType jobType, + AnalysisMethod analysisMethod, AnalysisType analysisType, String message, int lastExecTimeInMs, + AnalysisState state, ScheduleType scheduleType) { this.jobId = jobId; + this.taskId = taskId; this.catalogName = catalogName; this.dbName = dbName; this.tblName = tblName; this.colName = colName; + this.indexId = indexId; this.jobType = jobType; + this.analysisMethod = analysisMethod; + this.analysisType = analysisType; + this.message = message; + this.lastExecTimeInMs = lastExecTimeInMs; + this.state = state; this.scheduleType = scheduleType; } - @Override - public int hashCode() { - return Objects.hash(catalogName, dbName, tblName, colName, analysisType); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null || getClass() != obj.getClass()) { - return false; - } - AnalysisJobInfo other = (AnalysisJobInfo) obj; - return catalogName.equals(other.catalogName) - && dbName.equals(other.dbName) - && tblName.equals(other.tblName) - && colName.equals(other.colName) - && analysisType.equals(other.analysisType); - } - - // TODO: log to meta - public void updateState(JobState jobState) { - this.state = jobState; - } - @Override public String toString() { StringJoiner sj = new StringJoiner("\n", getClass().getName() + ":\n", "\n"); @@ -119,14 +106,15 @@ public class AnalysisJobInfo { sj.add("DBName: " + dbName); sj.add("TableName: " + tblName); sj.add("ColumnName: " + colName); - sj.add("JobType: " + analysisType.toString()); + sj.add("TaskType: " + analysisType.toString()); + sj.add("TaskMethod: " + analysisMethod.toString()); sj.add("Message: " + message); sj.add("LastExecTime: " + String.valueOf(lastExecTimeInMs)); sj.add("CurrentState: " + state.toString()); return sj.toString(); } - public JobState getState() { + public AnalysisState getState() { return state; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java new file mode 100644 index 0000000000..cc3b7b62f1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskInfoBuilder.java @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.AnalysisTaskInfo.ScheduleType; + +public class AnalysisTaskInfoBuilder { + private long jobId; + private long taskId; + private String catalogName; + private String dbName; + private String tblName; + private String colName; + private Long indexId; + private JobType jobType; + private AnalysisMethod analysisMethod; + private AnalysisType analysisType; + private String message; + private int lastExecTimeInMs; + private AnalysisState state; + private ScheduleType scheduleType; + + public AnalysisTaskInfoBuilder setJobId(long jobId) { + this.jobId = jobId; + return this; + } + + public AnalysisTaskInfoBuilder setTaskId(long taskId) { + this.taskId = taskId; + return this; + } + + public AnalysisTaskInfoBuilder setCatalogName(String catalogName) { + this.catalogName = catalogName; + return this; + } + + public AnalysisTaskInfoBuilder setDbName(String dbName) { + this.dbName = dbName; + return this; + } + + public AnalysisTaskInfoBuilder setTblName(String tblName) { + this.tblName = tblName; + return this; + } + + public AnalysisTaskInfoBuilder setColName(String colName) { + this.colName = colName; + return this; + } + + public AnalysisTaskInfoBuilder setIndexId(Long indexId) { + this.indexId = indexId; + return this; + } + + public AnalysisTaskInfoBuilder setJobType(JobType jobType) { + this.jobType = jobType; + return this; + } + + public AnalysisTaskInfoBuilder setAnalysisMethod(AnalysisMethod analysisMethod) { + this.analysisMethod = analysisMethod; + return this; + } + + public AnalysisTaskInfoBuilder setAnalysisType(AnalysisType analysisType) { + this.analysisType = analysisType; + return this; + } + + public AnalysisTaskInfoBuilder setMessage(String message) { + this.message = message; + return this; + } + + public AnalysisTaskInfoBuilder setLastExecTimeInMs(int lastExecTimeInMs) { + this.lastExecTimeInMs = lastExecTimeInMs; + return this; + } + + public AnalysisTaskInfoBuilder setState(AnalysisState state) { + this.state = state; + return this; + } + + public AnalysisTaskInfoBuilder setScheduleType(ScheduleType scheduleType) { + this.scheduleType = scheduleType; + return this; + } + + public AnalysisTaskInfo build() { + return new AnalysisTaskInfo(jobId, taskId, catalogName, dbName, tblName, colName, indexId, jobType, + analysisMethod, analysisType, message, lastExecTimeInMs, state, scheduleType); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java similarity index 54% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java index 0918339d35..71f23fe955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskScheduler.java @@ -20,94 +20,63 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.TableIf; -import org.apache.doris.common.FeConstants; import org.apache.doris.datasource.CatalogIf; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import com.google.common.base.Preconditions; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.text.StringSubstitutor; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import java.util.Set; -public class AnalysisJobScheduler { +public class AnalysisTaskScheduler { - private static final Logger LOG = LogManager.getLogger(AnalysisJobScheduler.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskScheduler.class); - private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE " - + FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " " - + "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}"; + private final PriorityQueue systemJobQueue = + new PriorityQueue<>(Comparator.comparingInt(BaseAnalysisTask::getLastExecTime)); - private final PriorityQueue systemJobQueue = - new PriorityQueue(Comparator.comparingInt(AnalysisJob::getLastExecTime)); + private final Queue manualJobQueue = new LinkedList<>(); - private final Queue manualJobQueue = new LinkedList<>(); + private final Set systemJobSet = new HashSet<>(); - private final Set systemJobSet = new HashSet<>(); + private final Set manualJobSet = new HashSet<>(); - private final Set manualJobSet = new HashSet<>(); - - private final AnalysisJobExecutor jobExecutor = new AnalysisJobExecutor(this); - - { - jobExecutor.start(); - } - - public void updateJobStatus(long jobId, JobState jobState, String message, long time) { - Map params = new HashMap<>(); - params.put("jobState", jobState.toString()); - params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : ""); - params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time); - params.put("jobId", String.valueOf(jobId)); - try { - StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE)); - } catch (Exception e) { - LOG.warn(String.format("Failed to update state for job: %s", jobId), e); - } - - } - - public synchronized void scheduleJobs(List analysisJobInfos) { - for (AnalysisJobInfo job : analysisJobInfos) { + public synchronized void scheduleJobs(List analysisJobInfos) { + for (AnalysisTaskInfo job : analysisJobInfos) { schedule(job); } } - public synchronized void schedule(AnalysisJobInfo analysisJobInfo) { + public synchronized void schedule(AnalysisTaskInfo analysisJobInfo) { CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(analysisJobInfo.catalogName); Preconditions.checkArgument(catalog != null); DatabaseIf db = catalog.getDbNullable(analysisJobInfo.dbName); Preconditions.checkArgument(db != null); TableIf table = db.getTableNullable(analysisJobInfo.tblName); Preconditions.checkArgument(table != null); - AnalysisJob analysisJob = table.createAnalysisJob(this, analysisJobInfo); - addToManualJobQueue(analysisJob); + BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisJobInfo); + addToManualJobQueue(analysisTask); if (analysisJobInfo.jobType.equals(JobType.MANUAL)) { return; } - addToSystemQueue(analysisJob); + addToSystemQueue(analysisTask); } - private void removeFromSystemQueue(AnalysisJob analysisJobInfo) { + private void removeFromSystemQueue(BaseAnalysisTask analysisJobInfo) { if (manualJobSet.contains(analysisJobInfo)) { systemJobQueue.remove(analysisJobInfo); manualJobSet.remove(analysisJobInfo); } } - private void addToSystemQueue(AnalysisJob analysisJobInfo) { + private void addToSystemQueue(BaseAnalysisTask analysisJobInfo) { if (systemJobSet.contains(analysisJobInfo)) { return; } @@ -116,7 +85,7 @@ public class AnalysisJobScheduler { notify(); } - private void addToManualJobQueue(AnalysisJob analysisJobInfo) { + private void addToManualJobQueue(BaseAnalysisTask analysisJobInfo) { if (manualJobSet.contains(analysisJobInfo)) { return; } @@ -125,7 +94,7 @@ public class AnalysisJobScheduler { notify(); } - public synchronized AnalysisJob getPendingJobs() { + public synchronized BaseAnalysisTask getPendingTasks() { while (true) { if (!manualJobQueue.isEmpty()) { return manualJobQueue.poll(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java similarity index 63% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java index 244a53b187..3ca55dbd9e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisJobWrapper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisTaskWrapper.java @@ -18,60 +18,66 @@ package org.apache.doris.statistics; import org.apache.doris.catalog.Env; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.concurrent.FutureTask; -public class AnalysisJobWrapper extends FutureTask { +public class AnalysisTaskWrapper extends FutureTask { - private static final Logger LOG = LogManager.getLogger(AnalysisJobWrapper.class); + private static final Logger LOG = LogManager.getLogger(AnalysisTaskWrapper.class); - private final AnalysisJob job; + private final BaseAnalysisTask task; private long startTime; - private final AnalysisJobExecutor executor; + private final AnalysisTaskExecutor executor; - public AnalysisJobWrapper(AnalysisJobExecutor executor, AnalysisJob job) { + public AnalysisTaskWrapper(AnalysisTaskExecutor executor, BaseAnalysisTask job) { super(() -> { job.execute(); return null; }); this.executor = executor; - this.job = job; + this.task = job; } @Override public void run() { startTime = System.currentTimeMillis(); - Exception except = null; + Throwable except = null; try { executor.putJob(this); super.run(); + Object result = get(); + if (result instanceof Throwable) { + except = (Throwable) result; + } } catch (Exception e) { except = e; } finally { executor.decr(); if (except != null) { - Env.getCurrentEnv().getAnalysisJobScheduler() - .updateJobStatus(job.getJobId(), JobState.FAILED, except.getMessage(), -1); + LOG.warn("Failed to execute task", except); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FAILED, except.getMessage(), -1); } else { - Env.getCurrentEnv().getAnalysisJobScheduler() - .updateJobStatus(job.getJobId(), JobState.FINISHED, "", System.currentTimeMillis()); + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(task.info, + AnalysisState.FINISHED, "", System.currentTimeMillis()); } - LOG.warn("{} finished, cost time:{}", job.toString(), System.currentTimeMillis() - startTime); + LOG.warn("{} finished, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); } } public boolean cancel() { try { - LOG.warn("{} cancelled, cost time:{}", job.toString(), System.currentTimeMillis() - startTime); - job.cancel(); + LOG.warn("{} cancelled, cost time:{}", task.toString(), System.currentTimeMillis() - startTime); + task.cancel(); } catch (Exception e) { - LOG.warn(String.format("Cancel job failed job info : %s", job.toString())); + LOG.warn(String.format("Cancel job failed job info : %s", task.toString())); } finally { executor.decr(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java new file mode 100644 index 0000000000..79b0ec8c16 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/BaseAnalysisTask.java @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; + +import com.google.common.annotations.VisibleForTesting; + +public abstract class BaseAnalysisTask { + + protected static final String INSERT_PART_STATISTICS = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT " + + "CONCAT(${tblId}, '-', '${colId}', '-', ${partId}) AS id, " + + "${catalogId} AS catalog_id, " + + "${dbId} AS db_id, " + + "${tblId} AS tbl_id, " + + "'${colId}' AS col_id, " + + "${partId} AS part_id, " + + "COUNT(1) AS row_count, " + + "NDV(`${colName}`) AS ndv, " + + "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, " + + "MIN(`${colName}`) AS min, " + + "MAX(`${colName}`) AS max, " + + "${dataSizeFunction} AS data_size, " + + "NOW() "; + + protected static final String INSERT_COL_STATISTICS = "INSERT INTO " + + "${internalDB}.${columnStatTbl}" + + " SELECT id, catalog_id, db_id, tbl_id, col_id, part_id, row_count, " + + " ndv, null_count, min, max, data_size, update_time\n" + + " FROM \n" + + " (SELECT CONCAT(${tblId}, '-', '${colId}') AS id, " + + " ${catalogId} AS catalog_id, " + + " ${dbId} AS db_id, " + + " ${tblId} AS tbl_id, " + + " '${colId}' AS col_id, " + + " NULL AS part_id, " + + " SUM(count) AS row_count, \n" + + " SUM(null_count) AS null_count, " + + " MIN(CAST(min AS ${type})) AS min, " + + " MAX(CAST(max AS ${type})) AS max, " + + " SUM(data_size_in_bytes) AS data_size, " + + " NOW() AS update_time\n" + + " FROM ${internalDB}.${columnStatTbl}" + + " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND " + + " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND " + + " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND " + + " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL" + + " ) t1, \n"; + + protected AnalysisTaskScheduler analysisTaskScheduler; + + protected AnalysisTaskInfo info; + + protected CatalogIf catalog; + + protected DatabaseIf db; + + protected TableIf tbl; + + protected Column col; + + protected StmtExecutor stmtExecutor; + + protected AnalysisState analysisState; + + @VisibleForTesting + public BaseAnalysisTask() { + + } + + public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + this.analysisTaskScheduler = analysisTaskScheduler; + this.info = info; + init(info); + } + + private void init(AnalysisTaskInfo info) { + catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName); + if (catalog == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED, + String.format("Catalog with name: %s not exists", info.dbName), System.currentTimeMillis()); + return; + } + db = (DatabaseIf) catalog.getDb(info.dbName).orElse(null); + if (db == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED, + String.format("DB with name %s not exists", info.dbName), System.currentTimeMillis()); + return; + } + tbl = (TableIf) db.getTable(info.tblName).orElse(null); + if (tbl == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( + info, AnalysisState.FAILED, + String.format("Table with name %s not exists", info.tblName), System.currentTimeMillis()); + } + if (info.analysisType != null && info.analysisType.equals(AnalysisType.COLUMN)) { + col = tbl.getColumn(info.colName); + if (col == null) { + Env.getCurrentEnv().getAnalysisManager().updateTaskStatus( + info, AnalysisState.FAILED, String.format("Column with name %s not exists", info.tblName), + System.currentTimeMillis()); + } + } + + } + + public abstract void execute() throws Exception; + + public void cancel() { + if (stmtExecutor != null) { + stmtExecutor.cancel(); + } + Env.getCurrentEnv().getAnalysisManager() + .updateTaskStatus(info, AnalysisState.FAILED, + String.format("Job has been cancelled: %s", info.toString()), -1); + } + + public int getLastExecTime() { + return info.lastExecTimeInMs; + } + + public long getJobId() { + return info.jobId; + } + + public AnalysisState getAnalysisState() { + return analysisState; + } + + protected String getDataSizeFunction(Column column) { + if (column.getType().isStringType()) { + return "SUM(LENGTH(`${colName}`))"; + } + return "COUNT(1) * " + column.getType().getSlotSize(); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java similarity index 89% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java index c92d92dac9..9a6dc93564 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HMSAnalysisTask.java @@ -22,12 +22,12 @@ import org.apache.doris.common.Config; import org.apache.commons.lang.NotImplementedException; -public class HMSAnalysisJob extends AnalysisJob { +public class HMSAnalysisTask extends BaseAnalysisTask { protected HMSExternalTable table; - public HMSAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public HMSAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); table = (HMSExternalTable) tbl; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java similarity index 97% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java index 216cafcb89..7bba21c692 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/HiveAnalysisTask.java @@ -43,16 +43,16 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -public class HiveAnalysisJob extends HMSAnalysisJob { - private static final Logger LOG = LogManager.getLogger(HiveAnalysisJob.class); +public class HiveAnalysisTask extends HMSAnalysisTask { + private static final Logger LOG = LogManager.getLogger(HiveAnalysisTask.class); public static final String TOTAL_SIZE = "totalSize"; public static final String NUM_ROWS = "numRows"; public static final String NUM_FILES = "numFiles"; public static final String TIMESTAMP = "transient_lastDdlTime"; - public HiveAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public HiveAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); } private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java similarity index 96% rename from fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java rename to fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java index a62052f8bf..b51fa4eb53 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/IcebergAnalysisTask.java @@ -38,14 +38,14 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -public class IcebergAnalysisJob extends HMSAnalysisJob { +public class IcebergAnalysisTask extends HMSAnalysisTask { private long numRows = 0; private long dataSize = 0; private long numNulls = 0; - public IcebergAnalysisJob(AnalysisJobScheduler analysisJobScheduler, AnalysisJobInfo info) { - super(analysisJobScheduler, info); + public IcebergAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); } private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO " diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java new file mode 100644 index 0000000000..70f4d0c5f3 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/MVAnalysisTask.java @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.analysis.CreateMaterializedViewStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.FunctionCallExpr; +import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.SelectListItem; +import org.apache.doris.analysis.SelectStmt; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.TableRef; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.MaterializedIndexMeta; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.base.Preconditions; + +import java.io.StringReader; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** + * Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which + * columns to be analyzed. + * TODO: Supports multi-table mv + */ +public class MVAnalysisTask extends BaseAnalysisTask { + + private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS + + " FROM (${sql}) mv"; + + private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS + + " (SELECT NDV(`${colName}`) AS ndv " + + " FROM (${sql}) mv) t2\n"; + + private MaterializedIndexMeta meta; + + private SelectStmt selectStmt; + + private OlapTable olapTable; + + public MVAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); + init(); + } + + private void init() { + olapTable = (OlapTable) tbl; + meta = olapTable.getIndexMetaByIndexId(info.indexId); + Preconditions.checkState(meta != null); + String mvDef = meta.getDefineStmt().originStmt; + SqlScanner input = + new SqlScanner(new StringReader(mvDef), 0L); + SqlParser parser = new SqlParser(input); + CreateMaterializedViewStmt cmv = null; + try { + cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0); + } catch (Exception e) { + throw new RuntimeException(e); + } + selectStmt = cmv.getSelectStmt(); + selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName()); + } + + @Override + public void execute() throws Exception { + for (Column column : meta.getSchema()) { + SelectStmt selectOne = (SelectStmt) selectStmt.clone(); + TableRef tableRef = selectOne.getTableRefs().get(0); + SelectListItem selectItem = selectOne.getSelectList().getItems() + .stream() + .filter(i -> isCorrespondingToColumn(i, column)) + .findFirst() + .get(); + selectItem.setAlias(column.getName()); + Map params = new HashMap<>(); + for (Partition part : olapTable.getAllPartitions()) { + String partName = part.getName(); + PartitionNames partitionName = new PartitionNames(false, Arrays.asList(partName)); + tableRef.setPartitionNames(partitionName); + String sql = selectOne.toSql(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(meta.getIndexId())); + String colName = column.getName(); + params.put("colId", colName); + long partId = part.getId(); + params.put("partId", String.valueOf(partId)); + params.put("dataSizeFunction", getDataSizeFunction(column)); + params.put("dbName", info.dbName); + params.put("colName", colName); + params.put("tblName", String.valueOf(info.tblName)); + params.put("sql", sql); + StatisticsUtil.execUpdate(ANALYZE_MV_PART, params); + } + params.remove("partId"); + params.put("type", column.getType().toString()); + StatisticsUtil.execUpdate(ANALYZE_MV_COL, params); + Env.getCurrentEnv().getStatisticsCache().refreshSync(meta.getIndexId(), column.getName()); + } + } + + // Based on the fact that materialized view create statement's select expr only contains basic SlotRef and + // AggregateFunction. + private boolean isCorrespondingToColumn(SelectListItem item, Column column) { + Expr expr = item.getExpr(); + if (expr instanceof SlotRef) { + SlotRef slotRef = (SlotRef) expr; + return slotRef.getColumnName().equalsIgnoreCase(column.getName()); + } + if (expr instanceof FunctionCallExpr) { + FunctionCallExpr func = (FunctionCallExpr) expr; + SlotRef slotRef = (SlotRef) func.getChild(0); + return slotRef.getColumnName().equalsIgnoreCase(column.getName()); + } + return false; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java new file mode 100644 index 0000000000..23ea5ea6a6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/OlapAnalysisTask.java @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.Partition; +import org.apache.doris.common.FeConstants; +import org.apache.doris.qe.AutoCloseConnectContext; +import org.apache.doris.qe.StmtExecutor; +import org.apache.doris.statistics.util.StatisticsUtil; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.text.StringSubstitutor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Each task analyze one column. + */ +public class OlapAnalysisTask extends BaseAnalysisTask { + + private static final String ANALYZE_PARTITION_SQL_TEMPLATE = INSERT_PART_STATISTICS + + "FROM `${dbName}`.`${tblName}` " + + "PARTITION ${partName}"; + + private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS + + " (SELECT NDV(`${colName}`) AS ndv " + + " FROM `${dbName}`.`${tblName}`) t2\n"; + + @VisibleForTesting + public OlapAnalysisTask() { + super(); + } + + public OlapAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) { + super(analysisTaskScheduler, info); + } + + public void execute() throws Exception { + Map params = new HashMap<>(); + params.put("internalDB", FeConstants.INTERNAL_DB_NAME); + params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME); + params.put("catalogId", String.valueOf(catalog.getId())); + params.put("dbId", String.valueOf(db.getId())); + params.put("tblId", String.valueOf(tbl.getId())); + params.put("colId", String.valueOf(info.colName)); + params.put("dataSizeFunction", getDataSizeFunction(col)); + params.put("dbName", info.dbName); + params.put("colName", String.valueOf(info.colName)); + params.put("tblName", String.valueOf(info.tblName)); + List partitionAnalysisSQLs = new ArrayList<>(); + try { + tbl.readLock(); + Set partNames = tbl.getPartitionNames(); + for (String partName : partNames) { + Partition part = tbl.getPartition(partName); + if (part == null) { + continue; + } + params.put("partId", String.valueOf(tbl.getPartition(partName).getId())); + params.put("partName", String.valueOf(partName)); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + partitionAnalysisSQLs.add(stringSubstitutor.replace(ANALYZE_PARTITION_SQL_TEMPLATE)); + } + } finally { + tbl.readUnlock(); + } + execSQLs(partitionAnalysisSQLs); + params.remove("partId"); + params.put("type", col.getType().toString()); + StringSubstitutor stringSubstitutor = new StringSubstitutor(params); + String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE); + execSQL(sql); + Env.getCurrentEnv().getStatisticsCache().refreshSync(tbl.getId(), col.getName()); + } + + @VisibleForTesting + public void execSQLs(List partitionAnalysisSQLs) throws Exception { + for (String sql : partitionAnalysisSQLs) { + execSQL(sql); + } + } + + @VisibleForTesting + public void execSQL(String sql) throws Exception { + try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) { + this.stmtExecutor = new StmtExecutor(r.connectContext, sql); + this.stmtExecutor.execute(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java index c369da43b9..62cc5638ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJob.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; @@ -260,21 +259,6 @@ public class StatisticsJob { } } - /** - * get statisticsJob from analyzeStmt. - * AnalyzeStmt: analyze t1(c1,c2,c3) - * tableId: [t1] - * tableIdToColumnName: {t1: [c1,c2,c3]} - */ - public static StatisticsJob fromAnalyzeStmt(AnalyzeStmt stmt) throws AnalysisException { - long dbId = stmt.getDbId(); - Set tblIds = stmt.getTblIds(); - Map> tableIdToPartitionName = stmt.getTableIdToPartitionName(); - Map> tableIdToColumnName = stmt.getTableIdToColumnName(); - Map properties = stmt.getProperties(); - return new StatisticsJob(dbId, tblIds, tableIdToPartitionName, tableIdToColumnName, properties); - } - public List getShowInfo(@Nullable Long tableId) throws AnalysisException { List result = Lists.newArrayList(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java index 37966dc1fe..dbdc202f0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsJobManager.java @@ -17,20 +17,16 @@ package org.apache.doris.statistics; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.ShowAnalyzeStmt; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; -import org.apache.doris.common.UserException; import org.apache.doris.common.util.ListComparator; import org.apache.doris.common.util.OrderByPair; -import org.apache.doris.qe.ConnectContext; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -61,37 +57,6 @@ public class StatisticsJobManager { return idToStatisticsJob; } - public void createStatisticsJob(AnalyzeStmt analyzeStmt) throws UserException { - // The current statistics are only used for CBO test, - // and are not available to users. (work in progress) - // TODO(wzt): Further tests are needed - boolean enableCboStatistics = ConnectContext.get() - .getSessionVariable().getEnableCboStatistics(); - if (enableCboStatistics) { - // step1: init statistics job by analyzeStmt - StatisticsJob statisticsJob = StatisticsJob.fromAnalyzeStmt(analyzeStmt); - synchronized (this) { - // step2: check restrict - checkRestrict(analyzeStmt.getDbId(), statisticsJob.getTblIds()); - // step3: create it - createStatisticsJob(statisticsJob); - } - } else { - throw new UserException("Statistics are not yet stable, if you want to enable statistics," - + " use 'set enable_cbo_statistics=true' to enable it."); - } - } - - public void createStatisticsJob(StatisticsJob statisticsJob) throws DdlException { - idToStatisticsJob.put(statisticsJob.getId(), statisticsJob); - try { - Env.getCurrentEnv().getStatisticsJobScheduler().addPendingJob(statisticsJob); - } catch (IllegalStateException e) { - LOG.info("The pending statistics job is full. Please submit it again later."); - throw new DdlException("The pending statistics job is full, Please submit it again later."); - } - } - /** * The statistical job has the following restrict: * - Rule1: The same table cannot have two unfinished statistics jobs diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java index d1b9c8f4f4..715ec14464 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticsRepository.java @@ -18,16 +18,12 @@ package org.apache.doris.statistics; import org.apache.doris.analysis.AlterColumnStatsStmt; -import org.apache.doris.analysis.AnalyzeStmt; import org.apache.doris.analysis.TableName; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Partition; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.FeConstants; -import org.apache.doris.statistics.AnalysisJobInfo.AnalysisType; -import org.apache.doris.statistics.AnalysisJobInfo.JobState; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; import org.apache.doris.statistics.util.DBObjects; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; import org.apache.doris.statistics.util.StatisticsUtil; @@ -71,8 +67,9 @@ public class StatisticsRepository { + " WHERE `id` IN (${idList})"; private static final String PERSIST_ANALYSIS_JOB_SQL_TEMPLATE = "INSERT INTO " - + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, '${catalogName}', '${dbName}'," - + "'${tblName}','${colName}', '${jobType}', '${analysisType}', '${message}', '${lastExecTimeInMs}'," + + FULL_QUALIFIED_ANALYSIS_JOB_TABLE_NAME + " VALUES(${jobId}, ${taskId}, '${catalogName}', '${dbName}'," + + "'${tblName}','${colName}', ,'${indexId}','${jobType}', '${analysisType}', " + + "'${message}', '${lastExecTimeInMs}'," + "'${state}', '${scheduleType}')"; private static final String INSERT_INTO_COLUMN_STATISTICS = "INSERT INTO " @@ -134,39 +131,23 @@ public class StatisticsRepository { return stringJoiner.toString(); } - public static void createAnalysisJob(AnalyzeStmt analyzeStmt) { - String catalogName = analyzeStmt.getCatalogName(); - String db = analyzeStmt.getDBName(); - String tbl = analyzeStmt.getTblName(); - List colNames = analyzeStmt.getOptColumnNames(); - - if (colNames != null) { - for (String colName : colNames) { - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(Env.getCurrentEnv().getNextId(), catalogName, db, - tbl, colName, AnalysisJobInfo.JobType.MANUAL, ScheduleType.ONCE); - analysisJobInfo.analysisType = AnalysisType.FULL; - Map params = new HashMap<>(); - params.put("jobId", String.valueOf(analysisJobInfo.jobId)); - params.put("catalogName", analysisJobInfo.catalogName); - params.put("dbName", analysisJobInfo.dbName); - params.put("tblName", analysisJobInfo.tblName); - params.put("colName", analysisJobInfo.colName); - params.put("jobType", analysisJobInfo.jobType.toString()); - params.put("analysisType", analysisJobInfo.analysisType.toString()); - params.put("message", ""); - params.put("lastExecTimeInMs", "0"); - params.put("state", JobState.PENDING.toString()); - params.put("scheduleType", analysisJobInfo.scheduleType.toString()); - try { - StatisticsUtil.execUpdate( - new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE)); - } catch (Exception e) { - LOG.warn("Failed to persite job for column: {}", colName, e); - return; - } - Env.getCurrentEnv().getAnalysisJobScheduler().schedule(analysisJobInfo); - } - } + public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception { + Map params = new HashMap<>(); + params.put("jobId", String.valueOf(analysisTaskInfo.jobId)); + params.put("taskId", String.valueOf(analysisTaskInfo.taskId)); + params.put("catalogName", analysisTaskInfo.catalogName); + params.put("dbName", analysisTaskInfo.dbName); + params.put("tblName", analysisTaskInfo.tblName); + params.put("colName", analysisTaskInfo.colName); + params.put("indexId", String.valueOf(analysisTaskInfo.indexId)); + params.put("jobType", analysisTaskInfo.jobType.toString()); + params.put("analysisType", analysisTaskInfo.analysisMethod.toString()); + params.put("message", ""); + params.put("lastExecTimeInMs", "0"); + params.put("state", AnalysisState.PENDING.toString()); + params.put("scheduleType", analysisTaskInfo.scheduleType.toString()); + StatisticsUtil.execUpdate( + new StringSubstitutor(params).replace(PERSIST_ANALYSIS_JOB_SQL_TEMPLATE)); } public static void alterColumnStatistics(AlterColumnStatsStmt alterColumnStatsStmt) throws Exception { diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java index baa5ff1395..16a4a66ffc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/util/StatisticsUtil.java @@ -17,7 +17,6 @@ package org.apache.doris.statistics.util; - import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BoolLiteral; import org.apache.doris.analysis.DateLiteral; @@ -46,7 +45,7 @@ import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.SessionVariable; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo; +import org.apache.doris.statistics.AnalysisTaskInfo; import org.apache.doris.statistics.ColumnStatistic; import org.apache.doris.statistics.StatisticConstants; import org.apache.doris.statistics.util.InternalQueryResult.ResultRow; @@ -90,13 +89,11 @@ public class StatisticsUtil { StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql); r.connectContext.setExecutor(stmtExecutor); stmtExecutor.execute(); - } finally { - ConnectContext.remove(); } } // TODO: finish this. - public static List deserializeToAnalysisJob(List resultBatches) throws TException { + public static List deserializeToAnalysisJob(List resultBatches) throws TException { return new ArrayList<>(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java index 019d4d0d90..9e0b8c2a46 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/SqlModeTest.java @@ -41,7 +41,7 @@ public class SqlModeTest { } catch (Exception e) { Assert.fail(e.getMessage()); } - Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); + Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); parser = new SqlParser(new SqlScanner(new StringReader(stmt), SqlModeHelper.MODE_DEFAULT)); try { @@ -49,7 +49,7 @@ public class SqlModeTest { } catch (Exception e) { Assert.fail(e.getMessage()); } - Assert.assertEquals("SELECT FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); + Assert.assertEquals("SELECT * FROM `db1`.`tbl1` WHERE `name` = 'BILL GATES'", selectStmt.toSql()); } @Test diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java index e2cc43a617..658b9d8ba1 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletRepairAndBalanceTest.java @@ -28,7 +28,6 @@ import org.apache.doris.catalog.ColocateTableIndex; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.MaterializedIndex; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; @@ -96,7 +95,6 @@ public class TabletRepairAndBalanceTest { static { try { - InternalSchemaInitializer.forTest = true; tag1 = Tag.create(Tag.TYPE_LOCATION, "zone1"); tag2 = Tag.create(Tag.TYPE_LOCATION, "zone2"); } catch (AnalysisException e) { @@ -106,6 +104,7 @@ public class TabletRepairAndBalanceTest { @BeforeClass public static void beforeClass() throws Exception { + FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java index f7b5e0eda3..8d90e1a82e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/clone/TabletReplicaTooSlowTest.java @@ -21,7 +21,6 @@ import org.apache.doris.analysis.CreateDbStmt; import org.apache.doris.analysis.CreateTableStmt; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.TabletInvertedIndex; import org.apache.doris.common.Config; @@ -72,7 +71,7 @@ public class TabletReplicaTooSlowTest { @BeforeClass public static void beforeClass() throws Exception { - InternalSchemaInitializer.forTest = true; + FeConstants.runningUnitTest = true; System.out.println(runningDir); FeConstants.runningUnitTest = true; FeConstants.tablet_checker_interval_ms = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java index 25960fa5d4..95fb22eac6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/cluster/DecommissionBackendTest.java @@ -27,6 +27,7 @@ import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.ImmutableMap; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import java.util.List; @@ -38,6 +39,16 @@ public class DecommissionBackendTest extends TestWithFeService { return 3; } + @Override + protected void beforeCluster() { + FeConstants.runningUnitTest = true; + } + + @BeforeAll + public void beforeClass() { + FeConstants.runningUnitTest = true; + } + @Override protected void beforeCreatingConnectContext() throws Exception { FeConstants.default_scheduler_interval_millisecond = 1000; diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java index 691da60173..0c43ee6338 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/datasets/ssb/SSBTestBase.java @@ -17,9 +17,18 @@ package org.apache.doris.nereids.datasets.ssb; +import org.apache.doris.common.FeConstants; import org.apache.doris.nereids.datasets.tpch.AnalyzeCheckTestBase; +import org.junit.jupiter.api.BeforeAll; + public abstract class SSBTestBase extends AnalyzeCheckTestBase { + + @BeforeAll + public void beforeClass() { + FeConstants.runningUnitTest = true; + } + @Override protected void runBeforeAll() throws Exception { createDatabase("test"); diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java deleted file mode 100644 index e16f416368..0000000000 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobExecutorTest.java +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.statistics; - -import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; -import org.apache.doris.common.jmockit.Deencapsulation; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; -import org.apache.doris.statistics.util.BlockingCounter; -import org.apache.doris.utframe.TestWithFeService; - -import mockit.Expectations; -import mockit.Mocked; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.BlockingQueue; - -public class AnalysisJobExecutorTest extends TestWithFeService { - - @Mocked - AnalysisJobScheduler analysisJobScheduler; - - @Override - protected void runBeforeAll() throws Exception { - try { - InternalSchemaInitializer.createDB(); - createDatabase("analysis_job_test"); - connectContext.setDatabase("default_cluster:analysis_job_test"); - createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" - - + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" - + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" - + ");"); - InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer(); - Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt()); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - @Test - public void testExpiredJobCancellation() throws Exception { - AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler); - BlockingQueue b = Deencapsulation.getField(analysisJobExecutor, "jobQueue"); - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - AnalysisJob analysisJob = new AnalysisJob(analysisJobScheduler, analysisJobInfo); - AnalysisJobWrapper analysisJobWrapper = new AnalysisJobWrapper(analysisJobExecutor, analysisJob); - Deencapsulation.setField(analysisJobWrapper, "startTime", 5); - b.put(analysisJobWrapper); - new Expectations() { - { - analysisJobWrapper.cancel(); - times = 1; - } - }; - analysisJobExecutor.start(); - BlockingCounter counter = Deencapsulation.getField(analysisJobExecutor, "blockingCounter"); - Assertions.assertEquals(0, counter.getVal()); - } - - @Test - public void testJobExecution() throws Exception { - AnalysisJobExecutor analysisJobExecutor = new AnalysisJobExecutor(analysisJobScheduler); - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - AnalysisJob job = new AnalysisJob(analysisJobScheduler, analysisJobInfo); - new Expectations() { - { - analysisJobScheduler.getPendingJobs(); - result = job; - job.execute(); - times = 1; - } - }; - Deencapsulation.invoke(analysisJobExecutor, "doFetchAndExecute"); - } -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java index dfe7617b60..a01dd8f7cf 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisJobTest.java @@ -22,8 +22,9 @@ import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.qe.AutoCloseConnectContext; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.StmtExecutor; -import org.apache.doris.statistics.AnalysisJobInfo.JobType; -import org.apache.doris.statistics.AnalysisJobInfo.ScheduleType; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; import org.apache.doris.statistics.util.StatisticsUtil; import org.apache.doris.utframe.TestWithFeService; @@ -54,10 +55,10 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testCreateAnalysisJob(@Mocked AnalysisJobScheduler scheduler) throws Exception { + public void testCreateAnalysisJob(@Mocked AnalysisTaskScheduler scheduler) throws Exception { new Expectations() { { - scheduler.schedule((AnalysisJobInfo) any); + scheduler.schedule((AnalysisTaskInfo) any); times = 3; } }; @@ -86,7 +87,7 @@ public class AnalysisJobTest extends TestWithFeService { } @Test - public void testJobExecution(@Mocked AnalysisJobScheduler scheduler, @Mocked StmtExecutor stmtExecutor) + public void testJobExecution(@Mocked AnalysisTaskScheduler scheduler, @Mocked StmtExecutor stmtExecutor) throws Exception { new MockUp() { @@ -105,13 +106,12 @@ public class AnalysisJobTest extends TestWithFeService { times = 2; } }; - AnalysisJobInfo analysisJobInfo = new AnalysisJobInfo(0, - "internal", - "default_cluster:analysis_job_test", - "t1", - "col1", JobType.MANUAL, - ScheduleType.ONCE); - new AnalysisJob(scheduler, analysisJobInfo).execute(); + AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + new OlapAnalysisTask(scheduler, analysisJobInfo).execute(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java new file mode 100644 index 0000000000..f9fcbaf55f --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/AnalysisTaskExecutorTest.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.InternalSchemaInitializer; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisMethod; +import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType; +import org.apache.doris.statistics.AnalysisTaskInfo.JobType; +import org.apache.doris.statistics.util.BlockingCounter; +import org.apache.doris.utframe.TestWithFeService; + +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.BlockingQueue; + +public class AnalysisTaskExecutorTest extends TestWithFeService { + + @Mocked + AnalysisTaskScheduler analysisTaskScheduler; + + @Override + protected void runBeforeAll() throws Exception { + try { + InternalSchemaInitializer.createDB(); + createDatabase("analysis_job_test"); + connectContext.setDatabase("default_cluster:analysis_job_test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + + "DISTRIBUTED BY HASH(col3)\n" + "BUCKETS 1\n" + + "PROPERTIES(\n" + " \"replication_num\"=\"1\"\n" + + ");"); + InternalSchemaInitializer storageInitializer = new InternalSchemaInitializer(); + Env.getCurrentEnv().createTable(storageInitializer.buildAnalysisJobTblStmt()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testExpiredJobCancellation() throws Exception { + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + BlockingQueue b = Deencapsulation.getField(analysisTaskExecutor, "jobQueue"); + AnalysisTaskInfo analysisJobInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisTaskScheduler, analysisJobInfo); + AnalysisTaskWrapper analysisTaskWrapper = new AnalysisTaskWrapper(analysisTaskExecutor, analysisJob); + Deencapsulation.setField(analysisTaskWrapper, "startTime", 5); + b.put(analysisTaskWrapper); + new Expectations() { + { + analysisTaskWrapper.cancel(); + times = 1; + } + }; + analysisTaskExecutor.start(); + BlockingCounter counter = Deencapsulation.getField(analysisTaskExecutor, "blockingCounter"); + Assertions.assertEquals(0, counter.getVal()); + } + + @Test + public void testTaskExecution() throws Exception { + AnalysisTaskExecutor analysisTaskExecutor = new AnalysisTaskExecutor(analysisTaskScheduler); + AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(0).setTaskId(0) + .setCatalogName("internal").setDbName("default_cluster:analysis_job_test").setTblName("t1") + .setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType( + AnalysisType.COLUMN) + .build(); + OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskScheduler, analysisTaskInfo); + new MockUp() { + @Mock + public synchronized BaseAnalysisTask getPendingTasks() { + return task; + } + }; + new MockUp() { + @Mock + public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {} + }; + new Expectations() { + { + task.execute(); + times = 1; + } + }; + Deencapsulation.invoke(analysisTaskExecutor, "doFetchAndExecute"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java new file mode 100644 index 0000000000..7188342b83 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/statistics/MVStatisticsTest.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.statistics; + +import org.apache.doris.catalog.Env; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.statistics.util.StatisticsUtil; +import org.apache.doris.system.SystemInfoService; +import org.apache.doris.utframe.TestWithFeService; + +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mock; +import mockit.MockUp; +import mockit.Tested; +import org.junit.jupiter.api.Test; + +public class MVStatisticsTest extends TestWithFeService { + + @Injectable + StatisticsCache statisticsCache; + + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase(SystemInfoService.DEFAULT_CLUSTER + ":" + "test"); + createTable("CREATE TABLE t1 (col1 int not null, col2 int not null, col3 int not null)\n" + + "DISTRIBUTED BY HASH(col3)\n" + + "BUCKETS 1\n" + + "PROPERTIES(\n" + + " \"replication_num\"=\"1\"\n" + + ");\n"); + createMv("CREATE MATERIALIZED VIEW mv1 AS SELECT col3 , SUM(COL2) FROM t1 group by col3"); + } + + @Tested + + @Test + public void testCreate() throws Exception { + new Expectations() { + { + statisticsCache.refreshSync(anyLong, anyString); + times = 5; + } + }; + new MockUp() { + }; + new MockUp() { + + @Mock + public void execUpdate(String sql) throws Exception {} + }; + new MockUp(OlapAnalysisTask.class) { + + @Mock + public void execSQL(String sql) throws Exception {} + }; + new MockUp() { + + @Mock + public StatisticsCache getStatisticsCache() { + return statisticsCache; + } + }; + AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager(); + Deencapsulation.setField(analysisManager, "statisticsCache", statisticsCache); + getSqlStmtExecutor("analyze t1"); + Thread.sleep(3000); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java index 90b0e4658d..5ecce69719 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/TestWithFeService.java @@ -41,7 +41,6 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.DiskInfo; import org.apache.doris.catalog.Env; -import org.apache.doris.catalog.InternalSchemaInitializer; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Replica; import org.apache.doris.catalog.Table; @@ -121,13 +120,16 @@ public abstract class TestWithFeService { @BeforeAll public final void beforeAll() throws Exception { - InternalSchemaInitializer.forTest = true; beforeCreatingConnectContext(); connectContext = createDefaultCtx(); + beforeCluster(); createDorisCluster(); runBeforeAll(); } + protected void beforeCluster() { + } + @AfterAll public final void afterAll() throws Exception { runAfterAll();