[feature](stats) Support sync analyze (#18567)

Gammer:

```
ANALYZE [SYNC] TABLE ....
```

Add this feature so that we could test and tune stats framework conveniently.
This commit is contained in:
AKIRA
2023-04-12 18:49:30 +09:00
committed by GitHub
parent b93e04ab66
commit db44970685
21 changed files with 140 additions and 44 deletions

View File

@ -918,6 +918,8 @@ nonterminal MVRefreshInfo.RefreshMethod opt_refresh_method;
nonterminal MVRefreshTriggerInfo opt_refresh_trigger;
nonterminal MVRefreshInfo opt_mv_refersh_info;
nonterminal PartitionDesc opt_mv_partition;
nonterminal Boolean opt_sync;
precedence nonassoc COMMA;
precedence nonassoc STRING_LITERAL;
@ -2809,30 +2811,30 @@ show_create_reporitory_stmt ::=
// analyze statment
analyze_stmt ::=
KW_ANALYZE KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
{:
boolean is_whole_tbl = (cols == null);
boolean is_histogram = false;
boolean is_increment = false;
RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
:}
| KW_ANALYZE KW_INCREMENTAL KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
| KW_ANALYZE opt_sync:sync KW_INCREMENTAL KW_TABLE table_name:tbl opt_col_list:cols opt_partition_names:partitionNames opt_properties:properties
{:
boolean is_whole_tbl = (cols == null);
boolean is_histogram = false;
boolean is_increment = true;
RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
:}
| KW_ANALYZE KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM KW_ON ident_list:cols opt_partition_names:partitionNames opt_properties:properties
| KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM KW_ON ident_list:cols opt_partition_names:partitionNames opt_properties:properties
{:
boolean is_whole_tbl = false;
boolean is_histogram = true;
boolean is_increment = false;
RESULT = new AnalyzeStmt(tbl, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
RESULT = new AnalyzeStmt(tbl, sync, cols, partitionNames, properties, is_whole_tbl, is_histogram, is_increment);
:}
| KW_ANALYZE KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM
| KW_ANALYZE opt_sync:sync KW_TABLE table_name:tbl KW_UPDATE KW_HISTOGRAM
{:
RESULT = new AnalyzeStmt(tbl, null, null, new HashMap<>(), true, true, false);
RESULT = new AnalyzeStmt(tbl, sync, null, null, new HashMap<>(), true, true, false);
:}
;
@ -6932,6 +6934,16 @@ type_func_name_keyword ::=
{: RESULT = id; :}
;
opt_sync ::=
{:
RESULT = false;
:}
| KW_SYNC
{:
RESULT = true;
:}
;
// Keyword that we allow for identifiers
keyword ::=
KW_AFTER:id

View File

@ -73,6 +73,8 @@ public class AnalyzeStmt extends DdlStmt {
public boolean isIncrement;
private final TableName tableName;
private final boolean sync;
private final PartitionNames partitionNames;
private final List<String> columnNames;
private final Map<String, String> properties;
@ -82,6 +84,7 @@ public class AnalyzeStmt extends DdlStmt {
private TableIf table;
public AnalyzeStmt(TableName tableName,
boolean sync,
List<String> columnNames,
PartitionNames partitionNames,
Map<String, String> properties,
@ -89,6 +92,7 @@ public class AnalyzeStmt extends DdlStmt {
Boolean isHistogram,
Boolean isIncrement) {
this.tableName = tableName;
this.sync = sync;
this.columnNames = columnNames;
this.partitionNames = partitionNames;
this.properties = properties;
@ -275,4 +279,8 @@ public class AnalyzeStmt extends DdlStmt {
return sb.toString();
}
public boolean isSync() {
return sync;
}
}

View File

@ -50,7 +50,6 @@ import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskInfo.AnalysisType;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HistogramTask;
import org.apache.doris.statistics.MVAnalysisTask;
@ -1034,14 +1033,14 @@ public class OlapTable extends Table {
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskInfo info) {
if (info.analysisType.equals(AnalysisType.HISTOGRAM)) {
return new HistogramTask(scheduler, info);
return new HistogramTask(info);
}
if (info.analysisType.equals(AnalysisType.COLUMN)) {
return new OlapAnalysisTask(scheduler, info);
return new OlapAnalysisTask(info);
}
return new MVAnalysisTask(scheduler, info);
return new MVAnalysisTask(info);
}
@Override

View File

@ -28,7 +28,6 @@ import org.apache.doris.common.util.SqlUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.external.hudi.HudiTable;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
@ -522,7 +521,7 @@ public abstract class Table extends MetaObject implements Writable, TableIf {
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskInfo info) {
throw new NotImplementedException();
}

View File

@ -21,7 +21,6 @@ import org.apache.doris.alter.AlterCancelException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
@ -125,7 +124,7 @@ public interface TableIf {
TTableDescriptor toThrift();
BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info);
BaseAnalysisTask createAnalysisTask(AnalysisTaskInfo info);
long estimatedRowCount();

View File

@ -30,7 +30,6 @@ import org.apache.doris.datasource.ExternalSchemaCache;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.thrift.TTableDescriptor;
@ -301,7 +300,7 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskInfo info) {
throw new NotImplementedException();
}

View File

@ -24,7 +24,6 @@ import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
import org.apache.doris.statistics.AnalysisTaskInfo;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.HiveAnalysisTask;
import org.apache.doris.statistics.IcebergAnalysisTask;
@ -252,13 +251,13 @@ public class HMSExternalTable extends ExternalTable {
}
@Override
public BaseAnalysisTask createAnalysisTask(AnalysisTaskScheduler scheduler, AnalysisTaskInfo info) {
public BaseAnalysisTask createAnalysisTask(AnalysisTaskInfo info) {
makeSureInitialized();
switch (dlaType) {
case HIVE:
return new HiveAnalysisTask(scheduler, info);
return new HiveAnalysisTask(info);
case ICEBERG:
return new IcebergAnalysisTask(scheduler, info);
return new IcebergAnalysisTask(info);
default:
throw new IllegalArgumentException("Analysis job for dlaType " + dlaType + " not supported.");
}

View File

@ -43,6 +43,8 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@ -156,6 +158,10 @@ public class AnalysisManager {
olapTable.readUnlock();
}
}
if (analyzeStmt.isSync()) {
syncExecute(analysisTaskInfos.values());
return;
}
analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}
@ -210,4 +216,22 @@ public class AnalysisManager {
return results;
}
private void syncExecute(Collection<AnalysisTaskInfo> taskInfos) {
List<String> colNames = new ArrayList<>();
for (AnalysisTaskInfo info : taskInfos) {
try {
TableIf table = StatisticsUtil.findTable(info.catalogName,
info.dbName, info.tblName);
BaseAnalysisTask analysisTask = table.createAnalysisTask(info);
analysisTask.execute();
} catch (Throwable t) {
colNames.add(info.colName);
LOG.info("Failed to analyze, info: {}", info);
}
}
if (!colNames.isEmpty()) {
throw new RuntimeException("Failed to analyze following columns: " + String.join(",", colNames));
}
}
}

View File

@ -48,7 +48,7 @@ public class AnalysisTaskScheduler {
try {
TableIf table = StatisticsUtil.findTable(analysisTaskInfo.catalogName,
analysisTaskInfo.dbName, analysisTaskInfo.tblName);
BaseAnalysisTask analysisTask = table.createAnalysisTask(this, analysisTaskInfo);
BaseAnalysisTask analysisTask = table.createAnalysisTask(analysisTaskInfo);
switch (analysisTaskInfo.jobType) {
case MANUAL:
addToManualJobQueue(analysisTask);

View File

@ -86,8 +86,6 @@ public abstract class BaseAnalysisTask {
+ " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
+ " ) t1, \n";
protected AnalysisTaskScheduler analysisTaskScheduler;
protected AnalysisTaskInfo info;
protected CatalogIf catalog;
@ -109,8 +107,7 @@ public abstract class BaseAnalysisTask {
}
public BaseAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
this.analysisTaskScheduler = analysisTaskScheduler;
public BaseAnalysisTask(AnalysisTaskInfo info) {
this.info = info;
init(info);
}

View File

@ -26,8 +26,8 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
protected HMSExternalTable table;
public HMSAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public HMSAnalysisTask(AnalysisTaskInfo info) {
super(info);
table = (HMSExternalTable) tbl;
}

View File

@ -59,8 +59,8 @@ public class HistogramTask extends BaseAnalysisTask {
super();
}
public HistogramTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public HistogramTask(AnalysisTaskInfo info) {
super(info);
}
@Override

View File

@ -57,8 +57,8 @@ public class HiveAnalysisTask extends HMSAnalysisTask {
public static final String TIMESTAMP = "transient_lastDdlTime";
public static final String DELIMITER = "-";
public HiveAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public HiveAnalysisTask(AnalysisTaskInfo info) {
super(info);
}
private static final String ANALYZE_PARTITION_SQL_TEMPLATE = "INSERT INTO "

View File

@ -45,8 +45,8 @@ public class IcebergAnalysisTask extends HMSAnalysisTask {
private long dataSize = 0;
private long numNulls = 0;
public IcebergAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public IcebergAnalysisTask(AnalysisTaskInfo info) {
super(info);
}
private static final String INSERT_TABLE_SQL_TEMPLATE = "INSERT INTO "

View File

@ -62,8 +62,8 @@ public class MVAnalysisTask extends BaseAnalysisTask {
private OlapTable olapTable;
public MVAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public MVAnalysisTask(AnalysisTaskInfo info) {
super(info);
init();
}

View File

@ -53,8 +53,8 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
super();
}
public OlapAnalysisTask(AnalysisTaskScheduler analysisTaskScheduler, AnalysisTaskInfo info) {
super(analysisTaskScheduler, info);
public OlapAnalysisTask(AnalysisTaskInfo info) {
super(info);
}
public void execute() throws Exception {

View File

@ -113,7 +113,7 @@ public class AnalysisJobTest extends TestWithFeService {
AnalysisType.COLUMN)
.setPartitionNames(Sets.newHashSet("t1"))
.build();
new OlapAnalysisTask(scheduler, analysisJobInfo).execute();
new OlapAnalysisTask(analysisJobInfo).execute();
}
}

View File

@ -66,7 +66,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
.setColName("col1").setJobType(JobType.MANUAL).setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(
AnalysisType.COLUMN)
.build();
OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisTaskScheduler, analysisJobInfo);
OlapAnalysisTask analysisJob = new OlapAnalysisTask(analysisJobInfo);
new MockUp<AnalysisTaskScheduler>() {
public synchronized BaseAnalysisTask getPendingTasks() {
@ -98,7 +98,7 @@ public class AnalysisTaskExecutorTest extends TestWithFeService {
AnalysisType.COLUMN)
.setPartitionNames(Sets.newHashSet("t1"))
.build();
OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskScheduler, analysisTaskInfo);
OlapAnalysisTask task = new OlapAnalysisTask(analysisTaskInfo);
new MockUp<AnalysisTaskScheduler>() {
@Mock
public synchronized BaseAnalysisTask getPendingTasks() {

View File

@ -107,7 +107,7 @@ public class HistogramTaskTest extends TestWithFeService {
.setAnalysisType(AnalysisType.HISTOGRAM)
.setPartitionNames(Sets.newHashSet("t"))
.build();
HistogramTask task = new HistogramTask(analysisTaskScheduler, analysisTaskInfo);
HistogramTask task = new HistogramTask(analysisTaskInfo);
new MockUp<AnalysisTaskScheduler>() {
@Mock

View File

@ -0,0 +1,9 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
5 4 0 1 8 20
5 4 0 1 8 20
5 5 0 1 7 5
5 5 0 1 7 5
5 5 0 1 9 20
5 5 0 1 9 20

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("analyze_test") {
sql """
DROP TABLE IF EXISTS test_table_alter_column_stats
"""
sql """CREATE TABLE test_table_alter_column_stats (col1 varchar(11451) not null, col2 int not null, col3 int not null)
UNIQUE KEY(col1)
DISTRIBUTED BY HASH(col1)
BUCKETS 3
PROPERTIES(
"replication_num"="1",
"enable_unique_key_merge_on_write"="true"
);"""
sql """insert into test_table_alter_column_stats values(1, 2, 3);"""
sql """insert into test_table_alter_column_stats values(4, 5, 6);"""
sql """insert into test_table_alter_column_stats values(7, 1, 9);"""
sql """insert into test_table_alter_column_stats values(3, 8, 2);"""
sql """insert into test_table_alter_column_stats values(5, 2, 1);"""
sql """delete from __internal_schema.column_statistics where col_id in ('col1', 'col2', 'col3')"""
sql """
analyze sync table test_table_alter_column_stats;
"""
order_qt_sql """
select count, ndv, null_count, min, max, data_size_in_bytes from __internal_schema.column_statistics where
col_id in ('col1', 'col2', 'col3') order by col_id
"""
sql """
DROP TABLE IF EXISTS test_table_alter_column_stats
"""
}