[refactor](stats) refactor collection logic and opt some config (#26163)
1. not collect partition stats anymore 2. merge insert of stats 3. delete period collector since it is useless 4. remove enable_auto_sample 5. move some config related to stats to global session variable Before this PR, when analyze a table, the insert count equals column count times 2 After this PR, insert count of analyze table would reduce to column count / insert_merge_item_count. According to my test, when analyzing tpch lineitem, the insert sql count is 1
This commit is contained in:
@ -230,7 +230,6 @@ import org.apache.doris.statistics.AnalysisManager;
|
||||
import org.apache.doris.statistics.StatisticsAutoCollector;
|
||||
import org.apache.doris.statistics.StatisticsCache;
|
||||
import org.apache.doris.statistics.StatisticsCleaner;
|
||||
import org.apache.doris.statistics.StatisticsPeriodCollector;
|
||||
import org.apache.doris.statistics.query.QueryStats;
|
||||
import org.apache.doris.system.Backend;
|
||||
import org.apache.doris.system.Frontend;
|
||||
@ -495,8 +494,6 @@ public class Env {
|
||||
|
||||
private StatisticsAutoCollector statisticsAutoCollector;
|
||||
|
||||
private StatisticsPeriodCollector statisticsPeriodCollector;
|
||||
|
||||
private HiveTransactionMgr hiveTransactionMgr;
|
||||
|
||||
private TopicPublisherThread topicPublisherThread;
|
||||
@ -720,7 +717,6 @@ public class Env {
|
||||
this.analysisManager = new AnalysisManager();
|
||||
this.statisticsCleaner = new StatisticsCleaner();
|
||||
this.statisticsAutoCollector = new StatisticsAutoCollector();
|
||||
this.statisticsPeriodCollector = new StatisticsPeriodCollector();
|
||||
this.globalFunctionMgr = new GlobalFunctionMgr();
|
||||
this.workloadGroupMgr = new WorkloadGroupMgr();
|
||||
this.queryStats = new QueryStats();
|
||||
@ -971,9 +967,6 @@ public class Env {
|
||||
if (statisticsAutoCollector != null) {
|
||||
statisticsAutoCollector.start();
|
||||
}
|
||||
if (statisticsPeriodCollector != null) {
|
||||
statisticsPeriodCollector.start();
|
||||
}
|
||||
|
||||
queryCancelWorker.start();
|
||||
|
||||
|
||||
@ -54,7 +54,6 @@ import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
import org.apache.doris.statistics.HistogramTask;
|
||||
import org.apache.doris.statistics.MVAnalysisTask;
|
||||
import org.apache.doris.statistics.OlapAnalysisTask;
|
||||
import org.apache.doris.statistics.TableStatsMeta;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
@ -1122,11 +1121,9 @@ public class OlapTable extends Table {
|
||||
public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
|
||||
if (info.analysisType.equals(AnalysisType.HISTOGRAM)) {
|
||||
return new HistogramTask(info);
|
||||
}
|
||||
if (info.analysisType.equals(AnalysisType.FUNDAMENTALS)) {
|
||||
} else {
|
||||
return new OlapAnalysisTask(info);
|
||||
}
|
||||
return new MVAnalysisTask(info);
|
||||
}
|
||||
|
||||
public boolean needReAnalyzeTable(TableStatsMeta tblStats) {
|
||||
@ -1146,7 +1143,7 @@ public class OlapTable extends Table {
|
||||
}
|
||||
long updateRows = tblStats.updatedRows.get();
|
||||
int tblHealth = StatisticsUtil.getTableHealth(rowCount, updateRows);
|
||||
return tblHealth < Config.table_stats_health_threshold;
|
||||
return tblHealth < StatisticsUtil.getTableStatsHealthThreshold();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -569,6 +569,10 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> {
|
||||
}
|
||||
|
||||
private ColumnStatistic getColumnStatistic(TableIf table, String colName) {
|
||||
ConnectContext connectContext = ConnectContext.get();
|
||||
if (connectContext != null && connectContext.getSessionVariable().internalSession) {
|
||||
return ColumnStatistic.UNKNOWN;
|
||||
}
|
||||
long catalogId;
|
||||
long dbId;
|
||||
try {
|
||||
|
||||
@ -62,6 +62,7 @@ import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* System variable.
|
||||
@ -430,6 +431,17 @@ public class SessionVariable implements Serializable, Writable {
|
||||
|
||||
public static final String ENABLE_DECIMAL256 = "enable_decimal256";
|
||||
|
||||
public static final String STATS_INSERT_MERGE_ITEM_COUNT = "stats_insert_merge_item_count";
|
||||
|
||||
public static final String HUGE_TABLE_DEFAULT_SAMPLE_ROWS = "huge_table_default_sample_rows";
|
||||
public static final String HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = "huge_table_lower_bound_size_in_bytes";
|
||||
|
||||
public static final String HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS
|
||||
= "huge_table_auto_analyze_interval_in_millis";
|
||||
|
||||
public static final String TABLE_STATS_HEALTH_THRESHOLD
|
||||
= "table_stats_health_threshold";
|
||||
|
||||
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
|
||||
SKIP_DELETE_PREDICATE,
|
||||
SKIP_DELETE_BITMAP,
|
||||
@ -486,7 +498,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
public int queryTimeoutS = 900;
|
||||
|
||||
// query timeout in second.
|
||||
@VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, needForward = true)
|
||||
@VariableMgr.VarAttr(name = ANALYZE_TIMEOUT, flag = VariableMgr.GLOBAL, needForward = true)
|
||||
public int analyzeTimeoutS = 43200;
|
||||
|
||||
// The global max_execution_time value provides the default for the session value for new connections.
|
||||
@ -1246,7 +1258,7 @@ public class SessionVariable implements Serializable, Writable {
|
||||
description = {"该参数定义自动ANALYZE例程的结束时间",
|
||||
"This parameter defines the end time for the automatic ANALYZE routine."},
|
||||
flag = VariableMgr.GLOBAL)
|
||||
public String fullAutoAnalyzeEndTime = "02:00:00";
|
||||
public String fullAutoAnalyzeEndTime = "23:59:59";
|
||||
|
||||
@VariableMgr.VarAttr(name = SQL_DIALECT, needForward = true, checker = "checkSqlDialect",
|
||||
description = {"解析sql使用的方言", "The dialect used to parse sql."})
|
||||
@ -1276,6 +1288,48 @@ public class SessionVariable implements Serializable, Writable {
|
||||
"the runtime filter id in IGNORE_RUNTIME_FILTER_IDS list will not be generated"})
|
||||
|
||||
public String ignoreRuntimeFilterIds = "";
|
||||
|
||||
@VariableMgr.VarAttr(name = STATS_INSERT_MERGE_ITEM_COUNT, flag = VariableMgr.GLOBAL, description = {
|
||||
"控制统计信息相关INSERT攒批数量", "Controls the batch size for stats INSERT merging."
|
||||
}
|
||||
)
|
||||
public int statsInsertMergeItemCount = 200;
|
||||
|
||||
@VariableMgr.VarAttr(name = HUGE_TABLE_DEFAULT_SAMPLE_ROWS, flag = VariableMgr.GLOBAL, description = {
|
||||
"定义开启开启大表自动sample后,对大表的采样比例",
|
||||
"This defines the number of sample percent for large tables when automatic sampling for"
|
||||
+ "large tables is enabled"
|
||||
|
||||
})
|
||||
public long hugeTableDefaultSampleRows = 4194304;
|
||||
|
||||
|
||||
@VariableMgr.VarAttr(name = HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES, flag = VariableMgr.GLOBAL,
|
||||
description = {
|
||||
"大小超过该值的表将会自动通过采样收集统计信息",
|
||||
"This defines the lower size bound for large tables. "
|
||||
+ "When enable_auto_sample is enabled, tables"
|
||||
+ "larger than this value will automatically collect "
|
||||
+ "statistics through sampling"})
|
||||
public long hugeTableLowerBoundSizeInBytes = 5L * 1024 * 1024 * 1024;
|
||||
|
||||
@VariableMgr.VarAttr(name = HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS, flag = VariableMgr.GLOBAL,
|
||||
description = {"控制对大表的自动ANALYZE的最小时间间隔,"
|
||||
+ "在该时间间隔内大小超过huge_table_lower_bound_size_in_bytes的表仅ANALYZE一次",
|
||||
"This controls the minimum time interval for automatic ANALYZE on large tables."
|
||||
+ "Within this interval,"
|
||||
+ "tables larger than huge_table_lower_bound_size_in_bytes are analyzed only once."})
|
||||
public long hugeTableAutoAnalyzeIntervalInMillis = TimeUnit.HOURS.toMillis(12);
|
||||
|
||||
@VariableMgr.VarAttr(name = TABLE_STATS_HEALTH_THRESHOLD, flag = VariableMgr.GLOBAL,
|
||||
description = {"取值在0-100之间,当自上次统计信息收集操作之后"
|
||||
+ "数据更新量达到 (100 - table_stats_health_threshold)% ,认为该表的统计信息已过时",
|
||||
"The value should be between 0 and 100. When the data update quantity "
|
||||
+ "exceeds (100 - table_stats_health_threshold)% since the last "
|
||||
+ "statistics collection operation, the statistics for this table are"
|
||||
+ "considered outdated."})
|
||||
public int tableStatsHealthThreshold = 60;
|
||||
|
||||
public static final String IGNORE_RUNTIME_FILTER_IDS = "ignore_runtime_filter_ids";
|
||||
|
||||
public Set<Integer> getIgnoredRuntimeFilterIds() {
|
||||
|
||||
@ -0,0 +1,193 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.qe.AuditLogHelper;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class AnalysisJob {
|
||||
|
||||
public static final Logger LOG = LogManager.getLogger(AnalysisJob.class);
|
||||
|
||||
protected Set<BaseAnalysisTask> queryingTask;
|
||||
|
||||
protected Set<BaseAnalysisTask> queryFinished;
|
||||
|
||||
protected List<ColStatsData> buf;
|
||||
|
||||
protected int totalTaskCount;
|
||||
|
||||
protected int queryFinishedTaskCount;
|
||||
|
||||
protected StmtExecutor stmtExecutor;
|
||||
|
||||
protected boolean killed;
|
||||
|
||||
protected long start;
|
||||
|
||||
protected AnalysisInfo jobInfo;
|
||||
|
||||
protected AnalysisManager analysisManager;
|
||||
|
||||
public AnalysisJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask> queryingTask) {
|
||||
for (BaseAnalysisTask task : queryingTask) {
|
||||
task.job = this;
|
||||
}
|
||||
this.queryingTask = new HashSet<>(queryingTask);
|
||||
this.queryFinished = new HashSet<>();
|
||||
this.buf = new ArrayList<>();
|
||||
totalTaskCount = queryingTask.size();
|
||||
start = System.currentTimeMillis();
|
||||
this.jobInfo = jobInfo;
|
||||
this.analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
}
|
||||
|
||||
public synchronized void appendBuf(BaseAnalysisTask task, List<ColStatsData> statsData) {
|
||||
queryingTask.remove(task);
|
||||
buf.addAll(statsData);
|
||||
queryFinished.add(task);
|
||||
queryFinishedTaskCount += 1;
|
||||
if (queryFinishedTaskCount == totalTaskCount) {
|
||||
writeBuf();
|
||||
updateTaskState(AnalysisState.FINISHED, "Cost time in sec: "
|
||||
+ (System.currentTimeMillis() - start) / 1000);
|
||||
deregisterJob();
|
||||
} else if (buf.size() >= StatisticsUtil.getInsertMergeCount()) {
|
||||
writeBuf();
|
||||
}
|
||||
}
|
||||
|
||||
// CHECKSTYLE OFF
|
||||
// fallthrough here is expected
|
||||
public void updateTaskState(AnalysisState state, String msg) {
|
||||
long time = System.currentTimeMillis();
|
||||
switch (state) {
|
||||
case FAILED:
|
||||
for (BaseAnalysisTask task : queryingTask) {
|
||||
analysisManager.updateTaskStatus(task.info, state, msg, time);
|
||||
task.cancel();
|
||||
}
|
||||
killed = true;
|
||||
case FINISHED:
|
||||
for (BaseAnalysisTask task : queryFinished) {
|
||||
analysisManager.updateTaskStatus(task.info, state, msg, time);
|
||||
}
|
||||
default:
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
|
||||
protected void writeBuf() {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
// buf could be empty when nothing need to do, for example user submit an analysis task for table with no data
|
||||
// change
|
||||
if (!buf.isEmpty()) {
|
||||
String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
|
||||
StringJoiner values = new StringJoiner(",");
|
||||
for (ColStatsData data : buf) {
|
||||
values.add(data.toSQL(true));
|
||||
}
|
||||
insertStmt += values.toString();
|
||||
int retryTimes = 0;
|
||||
while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
break;
|
||||
} catch (Exception t) {
|
||||
LOG.warn("Failed to write buf: " + insertStmt, t);
|
||||
retryTimes++;
|
||||
if (retryTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
updateTaskState(AnalysisState.FAILED, t.getMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
updateTaskState(AnalysisState.FINISHED, "");
|
||||
syncLoadStats();
|
||||
queryFinished.clear();
|
||||
}
|
||||
|
||||
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
|
||||
try {
|
||||
stmtExecutor.execute();
|
||||
QueryState queryState = stmtExecutor.getContext().getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(
|
||||
"Failed to insert : " + stmtExecutor.getOriginStmt().originStmt + "Error msg: "
|
||||
+ queryState.getErrorMessage());
|
||||
}
|
||||
} finally {
|
||||
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
|
||||
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(),
|
||||
true);
|
||||
}
|
||||
}
|
||||
|
||||
public void taskFailed(BaseAnalysisTask task, String reason) {
|
||||
updateTaskState(AnalysisState.FAILED, reason);
|
||||
cancel();
|
||||
deregisterJob();
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
for (BaseAnalysisTask task : queryingTask) {
|
||||
task.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
public void deregisterJob() {
|
||||
analysisManager.removeJob(jobInfo.jobId);
|
||||
}
|
||||
|
||||
protected void syncLoadStats() {
|
||||
long tblId = jobInfo.tblId;
|
||||
for (BaseAnalysisTask task : queryFinished) {
|
||||
String colName = task.col.getName();
|
||||
if (!Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tblId, -1, colName)) {
|
||||
analysisManager.removeColStatsStatus(tblId, colName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -42,7 +42,6 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
import org.apache.doris.common.util.Daemon;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
@ -101,7 +100,7 @@ import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class AnalysisManager extends Daemon implements Writable {
|
||||
public class AnalysisManager implements Writable {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisManager.class);
|
||||
|
||||
@ -113,11 +112,11 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
private AnalysisTaskExecutor taskExecutor;
|
||||
|
||||
// Store task information in metadata.
|
||||
private final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
|
||||
protected final NavigableMap<Long, AnalysisInfo> analysisTaskInfoMap =
|
||||
Collections.synchronizedNavigableMap(new TreeMap<>());
|
||||
|
||||
// Store job information in metadata.
|
||||
private final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
|
||||
protected final NavigableMap<Long, AnalysisInfo> analysisJobInfoMap =
|
||||
Collections.synchronizedNavigableMap(new TreeMap<>());
|
||||
|
||||
// Tracking system submitted job, keep in mem only
|
||||
@ -128,6 +127,8 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
|
||||
private final Map<Long, TableStatsMeta> idToTblStats = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<Long, AnalysisJob> idToAnalysisJob = new ConcurrentHashMap<>();
|
||||
|
||||
protected SimpleQueue<AnalysisInfo> autoJobs = createSimpleQueue(null, this);
|
||||
|
||||
private final Function<TaskStatusWrapper, Void> userJobStatusUpdater = w -> {
|
||||
@ -237,7 +238,6 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
new Function[] {userJobStatusUpdater, systemJobStatusUpdater};
|
||||
|
||||
public AnalysisManager() {
|
||||
super(TimeUnit.SECONDS.toMillis(StatisticConstants.ANALYZE_MANAGER_INTERVAL_IN_SECS));
|
||||
if (!Env.isCheckpointThread()) {
|
||||
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num);
|
||||
this.statisticsCache = new StatisticsCache();
|
||||
@ -245,44 +245,6 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void runOneCycle() {
|
||||
clear();
|
||||
}
|
||||
|
||||
private void clear() {
|
||||
clearExpiredAnalysisInfo(analysisJobInfoMap, (a) ->
|
||||
a.scheduleType.equals(ScheduleType.ONCE)
|
||||
&& System.currentTimeMillis() - a.lastExecTimeInMs
|
||||
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
|
||||
(id) -> {
|
||||
Env.getCurrentEnv().getEditLog().logDeleteAnalysisJob(new AnalyzeDeletionLog(id));
|
||||
return null;
|
||||
});
|
||||
clearExpiredAnalysisInfo(analysisTaskInfoMap, (a) -> System.currentTimeMillis() - a.lastExecTimeInMs
|
||||
> TimeUnit.DAYS.toMillis(StatisticConstants.ANALYSIS_JOB_INFO_EXPIRATION_TIME_IN_DAYS),
|
||||
(id) -> {
|
||||
Env.getCurrentEnv().getEditLog().logDeleteAnalysisTask(new AnalyzeDeletionLog(id));
|
||||
return null;
|
||||
});
|
||||
}
|
||||
|
||||
private void clearExpiredAnalysisInfo(Map<Long, AnalysisInfo> infoMap, Predicate<AnalysisInfo> isExpired,
|
||||
Function<Long, Void> writeLog) {
|
||||
synchronized (infoMap) {
|
||||
List<Long> expired = new ArrayList<>();
|
||||
for (Entry<Long, AnalysisInfo> entry : infoMap.entrySet()) {
|
||||
if (isExpired.test(entry.getValue())) {
|
||||
expired.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
for (Long k : expired) {
|
||||
infoMap.remove(k);
|
||||
writeLog.apply(k);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public StatisticsCache getStatisticsCache() {
|
||||
return statisticsCache;
|
||||
}
|
||||
@ -371,6 +333,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
boolean isSync = stmt.isSync();
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
createTaskForEachColumns(jobInfo, analysisTaskInfos, isSync);
|
||||
constructJob(jobInfo, analysisTaskInfos.values());
|
||||
if (!jobInfo.partitionOnly && stmt.isAllColumns()
|
||||
&& StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
|
||||
createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, isSync);
|
||||
@ -446,7 +409,6 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
*/
|
||||
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
|
||||
Set<String> partitionNames, AnalysisType analysisType) throws DdlException {
|
||||
long tableId = table.getId();
|
||||
|
||||
Map<String, Set<String>> columnToPartitions = columnNames.stream()
|
||||
.collect(Collectors.toMap(
|
||||
@ -467,27 +429,6 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
// Get the partition granularity statistics that have been collected
|
||||
Map<String, Set<String>> existColAndPartsForStats = StatisticsRepository
|
||||
.fetchColAndPartsForStats(tableId);
|
||||
|
||||
if (existColAndPartsForStats.isEmpty()) {
|
||||
// There is no historical statistical information, no need to do validation
|
||||
return columnToPartitions;
|
||||
}
|
||||
|
||||
Set<String> existPartIdsForStats = new HashSet<>();
|
||||
existColAndPartsForStats.values().forEach(existPartIdsForStats::addAll);
|
||||
Set<String> idToPartition = StatisticsUtil.getPartitionIds(table);
|
||||
// Get an invalid set of partitions (those partitions were deleted)
|
||||
Set<String> invalidPartIds = existPartIdsForStats.stream()
|
||||
.filter(id -> !idToPartition.contains(id)).collect(Collectors.toSet());
|
||||
|
||||
if (!invalidPartIds.isEmpty()) {
|
||||
// Delete invalid partition statistics to avoid affecting table statistics
|
||||
StatisticsRepository.dropStatistics(invalidPartIds);
|
||||
}
|
||||
|
||||
if (analysisType == AnalysisType.FUNDAMENTALS) {
|
||||
Map<String, Set<String>> result = table.findReAnalyzeNeededPartitions();
|
||||
result.keySet().retainAll(columnNames);
|
||||
@ -721,11 +662,12 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
public void syncExecute(Collection<BaseAnalysisTask> tasks) {
|
||||
SyncTaskCollection syncTaskCollection = new SyncTaskCollection(tasks);
|
||||
ConnectContext ctx = ConnectContext.get();
|
||||
ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
|
||||
try {
|
||||
ctxToSyncTask.put(ctx, syncTaskCollection);
|
||||
ThreadPoolExecutor syncExecPool = createThreadPoolForSyncAnalyze();
|
||||
syncTaskCollection.execute(syncExecPool);
|
||||
} finally {
|
||||
syncExecPool.shutdown();
|
||||
ctxToSyncTask.remove(ctx);
|
||||
}
|
||||
}
|
||||
@ -738,7 +680,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
new SynchronousQueue(),
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SYNC ANALYZE" + "-%d")
|
||||
.build(), new BlockedPolicy(poolName,
|
||||
(int) TimeUnit.HOURS.toSeconds(Config.analyze_task_timeout_in_hours)));
|
||||
StatisticsUtil.getAnalyzeTimeout()));
|
||||
}
|
||||
|
||||
public void dropStats(DropStatsStmt dropStatsStmt) throws DdlException {
|
||||
@ -760,6 +702,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
for (String col : cols) {
|
||||
Env.getCurrentEnv().getStatisticsCache().invalidate(tblId, -1L, col);
|
||||
}
|
||||
tableStats.updatedTime = 0;
|
||||
}
|
||||
logCreateTableStats(tableStats);
|
||||
StatisticsRepository.dropStatistics(tblId, cols);
|
||||
@ -1129,4 +1072,17 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
}
|
||||
return tableStats.findColumnStatsMeta(colName);
|
||||
}
|
||||
|
||||
public AnalysisJob findJob(long id) {
|
||||
return idToAnalysisJob.get(id);
|
||||
}
|
||||
|
||||
public void constructJob(AnalysisInfo jobInfo, Collection<? extends BaseAnalysisTask> tasks) {
|
||||
AnalysisJob job = new AnalysisJob(jobInfo, tasks);
|
||||
idToAnalysisJob.put(jobInfo.jobId, job);
|
||||
}
|
||||
|
||||
public void removeJob(long id) {
|
||||
idToAnalysisJob.remove(id);
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,9 +18,9 @@
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.ThreadPoolManager;
|
||||
import org.apache.doris.common.ThreadPoolManager.BlockedPolicy;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -36,7 +36,7 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(AnalysisTaskExecutor.class);
|
||||
|
||||
private final ThreadPoolExecutor executors;
|
||||
protected final ThreadPoolExecutor executors;
|
||||
|
||||
private final BlockingQueue<AnalysisTaskWrapper> taskQueue =
|
||||
new PriorityBlockingQueue<AnalysisTaskWrapper>(20,
|
||||
@ -72,18 +72,22 @@ public class AnalysisTaskExecutor extends Thread {
|
||||
|
||||
private void doCancelExpiredJob() {
|
||||
for (;;) {
|
||||
tryToCancel();
|
||||
}
|
||||
}
|
||||
|
||||
protected void tryToCancel() {
|
||||
try {
|
||||
AnalysisTaskWrapper taskWrapper = taskQueue.take();
|
||||
try {
|
||||
AnalysisTaskWrapper taskWrapper = taskQueue.take();
|
||||
try {
|
||||
long timeout = TimeUnit.HOURS.toMillis(Config.analyze_task_timeout_in_hours)
|
||||
- (System.currentTimeMillis() - taskWrapper.getStartTime());
|
||||
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
taskWrapper.cancel(e.getMessage());
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
LOG.warn(throwable);
|
||||
long timeout = TimeUnit.SECONDS.toMillis(StatisticsUtil.getAnalyzeTimeout())
|
||||
- (System.currentTimeMillis() - taskWrapper.getStartTime());
|
||||
taskWrapper.get(timeout < 0 ? 0 : timeout, TimeUnit.MILLISECONDS);
|
||||
} catch (Exception e) {
|
||||
taskWrapper.cancel(e.getMessage());
|
||||
}
|
||||
} catch (Throwable throwable) {
|
||||
LOG.warn("cancel analysis task failed", throwable);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
|
||||
@ -59,9 +58,8 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
|
||||
if (task.info.scheduleType.equals(ScheduleType.AUTOMATIC) && !StatisticsUtil.inAnalyzeTime(
|
||||
LocalTime.now(TimeUtils.getTimeZone().toZoneId()))) {
|
||||
// TODO: Do we need a separate AnalysisState here?
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info, AnalysisState.FAILED, "Auto task"
|
||||
+ "doesn't get executed within specified time range", System.currentTimeMillis());
|
||||
task.job.taskFailed(task, "Auto task"
|
||||
+ "doesn't get executed within specified time range");
|
||||
return;
|
||||
}
|
||||
executor.putJob(this);
|
||||
@ -76,15 +74,7 @@ public class AnalysisTaskWrapper extends FutureTask<Void> {
|
||||
if (!task.killed) {
|
||||
if (except != null) {
|
||||
LOG.warn("Analyze {} failed.", task.toString(), except);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info,
|
||||
AnalysisState.FAILED, Util.getRootCauseMessage(except), System.currentTimeMillis());
|
||||
} else {
|
||||
LOG.debug("Analyze {} finished, cost time:{}", task.toString(),
|
||||
System.currentTimeMillis() - startTime);
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(task.info,
|
||||
AnalysisState.FINISHED, "", System.currentTimeMillis());
|
||||
task.job.taskFailed(task, Util.getRootCauseMessage(except));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,14 +22,12 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.qe.AuditLogHelper;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisMethod;
|
||||
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.util.DBObjects;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
@ -38,6 +36,7 @@ import com.google.common.base.Preconditions;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class BaseAnalysisTask {
|
||||
@ -52,59 +51,25 @@ public abstract class BaseAnalysisTask {
|
||||
+ "else NDV(`${colName}`) * ${scaleFactor} end AS ndv, "
|
||||
;
|
||||
|
||||
/**
|
||||
* Stats stored in the column_statistics table basically has two types, `part_id` is null which means it is
|
||||
* aggregate from partition level stats, `part_id` is not null which means it is partition level stats.
|
||||
* For latter, it's id field contains part id, for previous doesn't.
|
||||
*/
|
||||
protected static final String INSERT_PART_STATISTICS = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "${partId} AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() ";
|
||||
|
||||
protected static final String INSERT_COL_STATISTICS = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT id, catalog_id, db_id, tbl_id, idx_id, col_id, part_id, row_count, "
|
||||
+ " ndv, null_count,"
|
||||
+ " to_base64(CAST(min AS string)), to_base64(CAST(max AS string)), data_size, update_time\n"
|
||||
+ " FROM \n"
|
||||
+ " (SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
protected static final String COLLECT_COL_STATISTICS =
|
||||
"SELECT CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ " ${catalogId} AS catalog_id, "
|
||||
+ " ${dbId} AS db_id, "
|
||||
+ " ${tblId} AS tbl_id, "
|
||||
+ " ${idxId} AS idx_id, "
|
||||
+ " '${colId}' AS col_id, "
|
||||
+ " NULL AS part_id, "
|
||||
+ " SUM(count) AS row_count, \n"
|
||||
+ " SUM(null_count) AS null_count, "
|
||||
+ " MIN(CAST(from_base64(min) AS ${type})) AS min, "
|
||||
+ " MAX(CAST(from_base64(max) AS ${type})) AS max, "
|
||||
+ " SUM(data_size_in_bytes) AS data_size, "
|
||||
+ " NOW() AS update_time \n"
|
||||
+ " FROM ${internalDB}.${columnStatTbl}"
|
||||
+ " WHERE ${internalDB}.${columnStatTbl}.db_id = '${dbId}' AND "
|
||||
+ " ${internalDB}.${columnStatTbl}.tbl_id='${tblId}' AND "
|
||||
+ " ${internalDB}.${columnStatTbl}.col_id='${colId}' AND "
|
||||
+ " ${internalDB}.${columnStatTbl}.idx_id='${idxId}' AND "
|
||||
+ " ${internalDB}.${columnStatTbl}.part_id IS NOT NULL"
|
||||
+ " ) t1, \n";
|
||||
+ " COUNT(1) AS row_count, "
|
||||
+ " NDV(`${colName}`) AS ndv, "
|
||||
+ " COUNT(1) - COUNT(${colName}) AS null_count, "
|
||||
+ " CAST(MIN(${colName}) AS STRING) AS min, "
|
||||
+ " CAST(MAX(${colName}) AS STRING) AS max, "
|
||||
+ " ${dataSizeFunction} AS data_size, "
|
||||
+ " NOW() AS update_time "
|
||||
+ " FROM `${dbName}`.`${tblName}`";
|
||||
|
||||
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
protected static final String ANALYZE_PARTITION_COLUMN_TEMPLATE =
|
||||
" SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
@ -115,8 +80,8 @@ public abstract class BaseAnalysisTask {
|
||||
+ "${row_count} AS row_count, "
|
||||
+ "${ndv} AS ndv, "
|
||||
+ "${null_count} AS null_count, "
|
||||
+ "to_base64('${min}') AS min, "
|
||||
+ "to_base64('${max}') AS max, "
|
||||
+ "'${min}' AS min, "
|
||||
+ "'${max}' AS max, "
|
||||
+ "${data_size} AS data_size, "
|
||||
+ "NOW() ";
|
||||
|
||||
@ -136,6 +101,8 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
protected TableSample tableSample = null;
|
||||
|
||||
protected AnalysisJob job;
|
||||
|
||||
@VisibleForTesting
|
||||
public BaseAnalysisTask() {
|
||||
|
||||
@ -192,6 +159,7 @@ public abstract class BaseAnalysisTask {
|
||||
}
|
||||
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
|
||||
if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
job.taskFailed(this, t.getMessage());
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
StatisticsUtil.sleep(TimeUnit.SECONDS.toMillis(2 ^ retriedTimes) * 10);
|
||||
@ -266,11 +234,10 @@ public abstract class BaseAnalysisTask {
|
||||
return new TableSample(true, (long) info.samplePercent);
|
||||
} else if (info.sampleRows > 0) {
|
||||
return new TableSample(false, info.sampleRows);
|
||||
} else if (info.analysisMethod == AnalysisMethod.FULL
|
||||
&& Config.enable_auto_sample
|
||||
&& tbl.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes) {
|
||||
} else if (info.jobType.equals(JobType.SYSTEM) && info.analysisMethod == AnalysisMethod.FULL
|
||||
&& tbl.getDataSize(true) > StatisticsUtil.getHugeTableLowerBoundSizeInBytes()) {
|
||||
// If user doesn't specify sample percent/rows, use auto sample and update sample rows in analysis info.
|
||||
return new TableSample(false, (long) Config.huge_table_default_sample_rows);
|
||||
return new TableSample(false, StatisticsUtil.getHugeTableSampleRows());
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
@ -283,23 +250,20 @@ public abstract class BaseAnalysisTask {
|
||||
col == null ? "TableRowCount" : col.getName());
|
||||
}
|
||||
|
||||
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
LOG.debug("execute internal sql: {}", stmtExecutor.getOriginStmt());
|
||||
try {
|
||||
stmtExecutor.execute();
|
||||
QueryState queryState = stmtExecutor.getContext().getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
catalog.getName(), db.getFullName(), info.colName, stmtExecutor.getOriginStmt().toString(),
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
public void setJob(AnalysisJob job) {
|
||||
this.job = job;
|
||||
}
|
||||
|
||||
protected void runQuery(String sql) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext a = StatisticsUtil.buildConnectContext()) {
|
||||
stmtExecutor = new StmtExecutor(a.connectContext, sql);
|
||||
stmtExecutor.executeInternalQuery();
|
||||
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
|
||||
job.appendBuf(this, Collections.singletonList(colStatsData));
|
||||
} finally {
|
||||
AuditLogHelper.logAuditLog(stmtExecutor.getContext(), stmtExecutor.getOriginStmt().toString(),
|
||||
stmtExecutor.getParsedStmt(), stmtExecutor.getQueryStatisticsForAuditLog(),
|
||||
true);
|
||||
LOG.debug("End cost time in secs: " + (System.currentTimeMillis() - startTime) / 1000);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,6 +19,8 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Base64;
|
||||
import java.util.StringJoiner;
|
||||
@ -54,6 +56,18 @@ public class ColStatsData {
|
||||
|
||||
public final String updateTime;
|
||||
|
||||
@VisibleForTesting
|
||||
public ColStatsData() {
|
||||
statsId = new StatsId();
|
||||
count = 0;
|
||||
ndv = 0;
|
||||
nullCount = 0;
|
||||
minLit = null;
|
||||
maxLit = null;
|
||||
dataSizeInBytes = 0;
|
||||
updateTime = null;
|
||||
}
|
||||
|
||||
public ColStatsData(ResultRow row) {
|
||||
this.statsId = new StatsId(row);
|
||||
this.count = (long) Double.parseDouble(row.get(7));
|
||||
|
||||
@ -23,26 +23,19 @@ import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.external.hive.util.HiveUtil;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
@ -51,9 +44,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
// While doing sample analysis, the sampled ndv result will multiply a factor (total size/sample size)
|
||||
// if ndv(col)/count(col) is greater than this threshold.
|
||||
|
||||
private static final String ANALYZE_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
private static final String ANALYZE_TABLE_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
@ -70,28 +61,9 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
private static final String ANALYZE_PARTITION_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "${partId} AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "to_base64(MIN(`${colName}`)) AS min, "
|
||||
+ "to_base64(MAX(`${colName}`)) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() FROM `${catalogName}`.`${dbName}`.`${tblName}` where ";
|
||||
|
||||
private static final String ANALYZE_TABLE_COUNT_TEMPLATE = "SELECT ROUND(COUNT(1) * ${scaleFactor}) as rowCount "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}` ${sampleExpr}";
|
||||
|
||||
// cache stats for each partition, it would be inserted into column_statistics in a batch.
|
||||
private final List<List<ColStatsData>> buf = new ArrayList<>();
|
||||
|
||||
private final boolean isTableLevelTask;
|
||||
private final boolean isPartitionOnly;
|
||||
private Set<String> partitionNames;
|
||||
@ -131,25 +103,16 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
* Get column statistics and insert the result to __internal_schema.column_statistics
|
||||
*/
|
||||
private void getTableColumnStats() throws Exception {
|
||||
if (isPartitionOnly) {
|
||||
getPartitionNames();
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
for (String partId : this.partitionNames) {
|
||||
partitionAnalysisSQLs.add(generateSqlForPartition(partId));
|
||||
}
|
||||
execSQLs(partitionAnalysisSQLs);
|
||||
} else {
|
||||
if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
|
||||
try {
|
||||
getPartitionColumnStats();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to collect stats for partition col {} using metadata, "
|
||||
+ "fallback to normal collection", col.getName(), e);
|
||||
getOrdinaryColumnStats();
|
||||
}
|
||||
} else {
|
||||
if (!info.usingSqlForPartitionColumn && isPartitionColumn()) {
|
||||
try {
|
||||
getPartitionColumnStats();
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to collect stats for partition col {} using metadata, "
|
||||
+ "fallback to normal collection", col.getName(), e);
|
||||
getOrdinaryColumnStats();
|
||||
}
|
||||
} else {
|
||||
getOrdinaryColumnStats();
|
||||
}
|
||||
}
|
||||
|
||||
@ -182,7 +145,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("maxFunction", getMaxFunction());
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
runQuery(sql);
|
||||
}
|
||||
|
||||
private void getPartitionColumnStats() throws Exception {
|
||||
@ -227,7 +190,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("data_size", String.valueOf(dataSize));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_PARTITION_COLUMN_TEMPLATE);
|
||||
executeInsertSql(sql);
|
||||
runQuery(sql);
|
||||
}
|
||||
|
||||
private String updateMinValue(String currentMin, String value) {
|
||||
@ -278,7 +241,7 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
partitionNames = table.getPartitionNames();
|
||||
} else if (info.partitionCount > 0) {
|
||||
partitionNames = table.getPartitionNames().stream()
|
||||
.limit(info.partitionCount).collect(Collectors.toSet());
|
||||
.limit(info.partitionCount).collect(Collectors.toSet());
|
||||
}
|
||||
if (partitionNames == null || partitionNames.isEmpty()) {
|
||||
throw new RuntimeException("Not a partition table or no partition specified.");
|
||||
@ -286,80 +249,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
}
|
||||
|
||||
private String generateSqlForPartition(String partId) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(ANALYZE_PARTITION_TEMPLATE);
|
||||
String[] splits = partId.split("/");
|
||||
for (int i = 0; i < splits.length; i++) {
|
||||
String[] kv = splits[i].split("=");
|
||||
sb.append(kv[0]);
|
||||
sb.append("='");
|
||||
sb.append(kv[1]);
|
||||
sb.append("'");
|
||||
if (i < splits.length - 1) {
|
||||
sb.append(" and ");
|
||||
}
|
||||
}
|
||||
Map<String, String> params = buildStatsParams(partId);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
return new StringSubstitutor(params).replace(sb.toString());
|
||||
}
|
||||
|
||||
public void execSQLs(List<String> partitionAnalysisSQLs) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOG.debug("analyze task {} start at {}", info.toString(), new Date());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
List<List<String>> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
|
||||
for (List<String> group : sqlGroups) {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL");
|
||||
group.forEach(partitionCollectSQL::add);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString());
|
||||
buf.add(stmtExecutor.executeInternalQuery()
|
||||
.stream().map(ColStatsData::new).collect(Collectors.toList()));
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL,
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
for (List<ColStatsData> colStatsDataList : buf) {
|
||||
StringBuilder batchInsertSQL =
|
||||
new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
|
||||
+ " VALUES ");
|
||||
StringJoiner sj = new StringJoiner(",");
|
||||
colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
|
||||
batchInsertSQL.append(sj);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString());
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
}
|
||||
} finally {
|
||||
LOG.debug("analyze task {} end. cost {}ms", info, System.currentTimeMillis() - startTime);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void executeInsertSql(String sql) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
this.stmtExecutor.execute();
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, String> buildStatsParams(String partId) {
|
||||
Map<String, String> commonParams = new HashMap<>();
|
||||
String id = StatisticsUtil.constructId(tbl.getId(), -1);
|
||||
|
||||
@ -20,25 +20,17 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.external.JdbcExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcAnalysisTask.class);
|
||||
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = "INSERT INTO "
|
||||
+ "${internalDB}.${columnStatTbl}"
|
||||
+ " SELECT "
|
||||
private static final String ANALYZE_SQL_TABLE_TEMPLATE = " SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
@ -49,8 +41,8 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "to_base64(MIN(`${colName}`)) AS min, "
|
||||
+ "to_base64(MAX(`${colName}`)) AS max, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() "
|
||||
+ "FROM `${catalogName}`.`${dbName}`.`${tblName}`";
|
||||
@ -117,25 +109,7 @@ public class JdbcAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("dataSizeFunction", getDataSizeFunction(col));
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
private void executeInsertSql(String sql) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
|
||||
r.connectContext.getSessionVariable().disableNereidsPlannerOnce();
|
||||
this.stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
r.connectContext.setExecutor(stmtExecutor);
|
||||
this.stmtExecutor.execute();
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(QueryState.MysqlStateType.ERR)) {
|
||||
LOG.warn(String.format("Failed to analyze %s.%s.%s, sql: [%s], error: [%s]",
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, queryState.getErrorMessage()));
|
||||
throw new RuntimeException(queryState.getErrorMessage());
|
||||
}
|
||||
LOG.debug(String.format("Analyze %s.%s.%s done. SQL: [%s]. Cost %d ms.",
|
||||
catalog.getName(), db.getFullName(), info.colName, sql, (System.currentTimeMillis() - startTime)));
|
||||
}
|
||||
runQuery(sql);
|
||||
}
|
||||
|
||||
private Map<String, String> buildTableStatsParams(String partId) {
|
||||
|
||||
@ -1,152 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.analysis.CreateMaterializedViewStmt;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.PartitionNames;
|
||||
import org.apache.doris.analysis.SelectListItem;
|
||||
import org.apache.doris.analysis.SelectStmt;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.SqlParser;
|
||||
import org.apache.doris.analysis.SqlScanner;
|
||||
import org.apache.doris.analysis.TableRef;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndexMeta;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.SqlParserUtils;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
import java.io.StringReader;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Analysis for the materialized view, only gets constructed when the AnalyzeStmt is not set which
|
||||
* columns to be analyzed.
|
||||
* TODO: Supports multi-table mv
|
||||
*/
|
||||
public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
|
||||
private static final String ANALYZE_MV_PART = INSERT_PART_STATISTICS
|
||||
+ " FROM (${sql}) mv ${sampleExpr}";
|
||||
|
||||
private static final String ANALYZE_MV_COL = INSERT_COL_STATISTICS
|
||||
+ " (SELECT NDV(`${colName}`) AS ndv "
|
||||
+ " FROM (${sql}) mv) t2";
|
||||
|
||||
private MaterializedIndexMeta meta;
|
||||
|
||||
private SelectStmt selectStmt;
|
||||
|
||||
private OlapTable olapTable;
|
||||
|
||||
public MVAnalysisTask(AnalysisInfo info) {
|
||||
super(info);
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
olapTable = (OlapTable) tbl;
|
||||
meta = olapTable.getIndexMetaByIndexId(info.indexId);
|
||||
Preconditions.checkState(meta != null);
|
||||
String mvDef = meta.getDefineStmt().originStmt;
|
||||
SqlScanner input =
|
||||
new SqlScanner(new StringReader(mvDef), 0L);
|
||||
SqlParser parser = new SqlParser(input);
|
||||
CreateMaterializedViewStmt cmv = null;
|
||||
try {
|
||||
cmv = (CreateMaterializedViewStmt) SqlParserUtils.getStmt(parser, 0);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
selectStmt = cmv.getSelectStmt();
|
||||
selectStmt.getTableRefs().get(0).getName().setDb(db.getFullName());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doExecute() throws Exception {
|
||||
for (Column column : meta.getSchema()) {
|
||||
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
|
||||
TableRef tableRef = selectOne.getTableRefs().get(0);
|
||||
SelectListItem selectItem = selectOne.getSelectList().getItems()
|
||||
.stream()
|
||||
.filter(i -> isCorrespondingToColumn(i, column))
|
||||
.findFirst()
|
||||
.get();
|
||||
selectItem.setAlias(column.getName());
|
||||
Map<String, String> params = new HashMap<>();
|
||||
for (String partName : tbl.getPartitionNames()) {
|
||||
PartitionNames partitionName = new PartitionNames(false,
|
||||
Collections.singletonList(partName));
|
||||
tableRef.setPartitionNames(partitionName);
|
||||
String sql = selectOne.toSql();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
params.put("catalogId", String.valueOf(catalog.getId()));
|
||||
params.put("dbId", String.valueOf(db.getId()));
|
||||
params.put("tblId", String.valueOf(tbl.getId()));
|
||||
params.put("idxId", String.valueOf(meta.getIndexId()));
|
||||
String colName = column.getName();
|
||||
params.put("colId", colName);
|
||||
String partId = olapTable.getPartition(partName) == null ? "NULL" :
|
||||
String.valueOf(olapTable.getPartition(partName).getId());
|
||||
params.put("partId", partId);
|
||||
params.put("dataSizeFunction", getDataSizeFunction(column));
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("colName", colName);
|
||||
params.put("tblName", tbl.getName());
|
||||
params.put("sql", sql);
|
||||
StatisticsUtil.execUpdate(ANALYZE_MV_PART, params);
|
||||
}
|
||||
params.remove("partId");
|
||||
params.remove("sampleExpr");
|
||||
params.put("type", column.getType().toString());
|
||||
StatisticsUtil.execUpdate(ANALYZE_MV_COL, params);
|
||||
Env.getCurrentEnv().getStatisticsCache()
|
||||
.refreshColStatsSync(meta.getIndexId(), meta.getIndexId(), column.getName());
|
||||
}
|
||||
}
|
||||
|
||||
// Based on the fact that materialized view create statement's select expr only contains basic SlotRef and
|
||||
// AggregateFunction.
|
||||
private boolean isCorrespondingToColumn(SelectListItem item, Column column) {
|
||||
Expr expr = item.getExpr();
|
||||
if (expr instanceof SlotRef) {
|
||||
SlotRef slotRef = (SlotRef) expr;
|
||||
return slotRef.getColumnName().equalsIgnoreCase(column.getName());
|
||||
}
|
||||
if (expr instanceof FunctionCallExpr) {
|
||||
FunctionCallExpr func = (FunctionCallExpr) expr;
|
||||
SlotRef slotRef = (SlotRef) func.getChild(0);
|
||||
return slotRef.getColumnName().equalsIgnoreCase(column.getName());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
@ -22,28 +22,21 @@ import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
import org.apache.doris.qe.QueryState;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.statistics.AnalysisInfo.JobType;
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
|
||||
import java.security.SecureRandom;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
@ -51,29 +44,6 @@ import java.util.stream.Collectors;
|
||||
*/
|
||||
public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
|
||||
// TODO Currently, NDV is computed for the full table; in fact,
|
||||
// NDV should only be computed for the relevant partition.
|
||||
private static final String ANALYZE_COLUMN_SQL_TEMPLATE = INSERT_COL_STATISTICS
|
||||
+ " (SELECT NDV(`${colName}`) AS ndv "
|
||||
+ " FROM `${dbName}`.`${tblName}`) t2";
|
||||
|
||||
private static final String COLLECT_PARTITION_STATS_SQL_TEMPLATE =
|
||||
" SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}', '-', ${partId}) AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
+ "${dbId} AS db_id, "
|
||||
+ "${tblId} AS tbl_id, "
|
||||
+ "${idxId} AS idx_id, "
|
||||
+ "'${colId}' AS col_id, "
|
||||
+ "${partId} AS part_id, "
|
||||
+ "COUNT(1) AS row_count, "
|
||||
+ "NDV(`${colName}`) AS ndv, "
|
||||
+ "SUM(CASE WHEN `${colName}` IS NULL THEN 1 ELSE 0 END) AS null_count, "
|
||||
+ "MIN(`${colName}`) AS min, "
|
||||
+ "MAX(`${colName}`) AS max, "
|
||||
+ "${dataSizeFunction} AS data_size, "
|
||||
+ "NOW() FROM `${dbName}`.`${tblName}` PARTITION ${partitionName}";
|
||||
|
||||
private static final String SAMPLE_COLUMN_SQL_TEMPLATE = "SELECT "
|
||||
+ "CONCAT(${tblId}, '-', ${idxId}, '-', '${colId}') AS id, "
|
||||
+ "${catalogId} AS catalog_id, "
|
||||
@ -92,9 +62,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
+ "FROM `${dbName}`.`${tblName}`"
|
||||
+ "${tablets}";
|
||||
|
||||
// cache stats for each partition, it would be inserted into column_statistics in a batch.
|
||||
private final List<List<ColStatsData>> buf = new ArrayList<>();
|
||||
|
||||
@VisibleForTesting
|
||||
public OlapAnalysisTask() {
|
||||
}
|
||||
@ -148,45 +115,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, stringSubstitutor.replace(SAMPLE_COLUMN_SQL_TEMPLATE));
|
||||
// Scalar query only return one row
|
||||
ColStatsData colStatsData = new ColStatsData(stmtExecutor.executeInternalQuery().get(0));
|
||||
OlapTable olapTable = (OlapTable) tbl;
|
||||
Collection<Partition> partitions = olapTable.getPartitions();
|
||||
int partitionCount = partitions.size();
|
||||
List<String> values = partitions.stream().map(p -> String.format(
|
||||
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())",
|
||||
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName(), p.getId())),
|
||||
InternalCatalog.INTERNAL_CATALOG_ID,
|
||||
db.getId(),
|
||||
tbl.getId(),
|
||||
-1,
|
||||
StatisticsUtil.quote(col.getName()),
|
||||
p.getId(),
|
||||
colStatsData.count / partitionCount,
|
||||
colStatsData.ndv / partitionCount,
|
||||
colStatsData.nullCount / partitionCount,
|
||||
StatisticsUtil.quote(colStatsData.minLit),
|
||||
StatisticsUtil.quote(colStatsData.maxLit),
|
||||
colStatsData.dataSizeInBytes / partitionCount)).collect(Collectors.toList());
|
||||
values.add(String.format(
|
||||
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())",
|
||||
StatisticsUtil.quote(StatisticsUtil.constructId(tbl.getId(), -1, col.getName())),
|
||||
InternalCatalog.INTERNAL_CATALOG_ID,
|
||||
db.getId(),
|
||||
tbl.getId(),
|
||||
-1,
|
||||
StatisticsUtil.quote(col.getName()),
|
||||
"NULL",
|
||||
colStatsData.count,
|
||||
colStatsData.ndv,
|
||||
colStatsData.nullCount,
|
||||
StatisticsUtil.quote(colStatsData.minLit),
|
||||
StatisticsUtil.quote(colStatsData.maxLit),
|
||||
colStatsData.dataSizeInBytes));
|
||||
String insertSQL = "INSERT INTO "
|
||||
+ StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
|
||||
+ " VALUES "
|
||||
+ String.join(",", values);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, insertSQL);
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
job.appendBuf(this, Collections.singletonList(colStatsData));
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,6 +127,7 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
protected void doFull() throws Exception {
|
||||
Set<String> partitionNames = info.colToPartitions.get(info.colName);
|
||||
if (partitionNames.isEmpty()) {
|
||||
job.appendBuf(this, Collections.emptyList());
|
||||
return;
|
||||
}
|
||||
Map<String, String> params = new HashMap<>();
|
||||
@ -212,68 +142,14 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
params.put("dbName", db.getFullName());
|
||||
params.put("colName", String.valueOf(info.colName));
|
||||
params.put("tblName", String.valueOf(tbl.getName()));
|
||||
List<String> partitionAnalysisSQLs = new ArrayList<>();
|
||||
try {
|
||||
tbl.readLock();
|
||||
|
||||
for (String partitionName : partitionNames) {
|
||||
Partition part = tbl.getPartition(partitionName);
|
||||
if (part == null) {
|
||||
continue;
|
||||
}
|
||||
params.put("partId", String.valueOf(tbl.getPartition(partitionName).getId()));
|
||||
// Avoid error when get the default partition
|
||||
params.put("partitionName", "`" + partitionName + "`");
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
partitionAnalysisSQLs.add(stringSubstitutor.replace(COLLECT_PARTITION_STATS_SQL_TEMPLATE));
|
||||
}
|
||||
} finally {
|
||||
tbl.readUnlock();
|
||||
}
|
||||
execSQLs(partitionAnalysisSQLs, params);
|
||||
execSQL(params);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void execSQLs(List<String> partitionAnalysisSQLs, Map<String, String> params) throws Exception {
|
||||
long startTime = System.currentTimeMillis();
|
||||
LOG.debug("analyze task {} start at {}", info.toString(), new Date());
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(info.jobType.equals(JobType.SYSTEM))) {
|
||||
List<List<String>> sqlGroups = Lists.partition(partitionAnalysisSQLs, StatisticConstants.UNION_ALL_LIMIT);
|
||||
for (List<String> group : sqlGroups) {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
StringJoiner partitionCollectSQL = new StringJoiner("UNION ALL");
|
||||
group.forEach(partitionCollectSQL::add);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, partitionCollectSQL.toString());
|
||||
buf.add(stmtExecutor.executeInternalQuery()
|
||||
.stream().map(ColStatsData::new).collect(Collectors.toList()));
|
||||
QueryState queryState = r.connectContext.getState();
|
||||
if (queryState.getStateType().equals(MysqlStateType.ERR)) {
|
||||
throw new RuntimeException(String.format("Failed to analyze %s.%s.%s, error: %s sql: %s",
|
||||
catalog.getName(), db.getFullName(), info.colName, partitionCollectSQL,
|
||||
queryState.getErrorMessage()));
|
||||
}
|
||||
}
|
||||
for (List<ColStatsData> colStatsDataList : buf) {
|
||||
StringBuilder batchInsertSQL =
|
||||
new StringBuilder("INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME
|
||||
+ " VALUES ");
|
||||
StringJoiner sj = new StringJoiner(",");
|
||||
colStatsDataList.forEach(c -> sj.add(c.toSQL(true)));
|
||||
batchInsertSQL.append(sj.toString());
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, batchInsertSQL.toString());
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
}
|
||||
params.put("type", col.getType().toString());
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, sql);
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
} finally {
|
||||
LOG.debug("analyze task {} end. cost {}ms", info,
|
||||
System.currentTimeMillis() - startTime);
|
||||
}
|
||||
public void execSQL(Map<String, String> params) throws Exception {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String collectColStats = stringSubstitutor.replace(COLLECT_COL_STATISTICS);
|
||||
runQuery(collectColStats);
|
||||
}
|
||||
|
||||
// Get sample tablets id and scale up scaleFactor
|
||||
|
||||
@ -78,12 +78,20 @@ public class StatisticConstants {
|
||||
|
||||
public static final int LOAD_RETRY_TIMES = 3;
|
||||
|
||||
// union more relation than 512 may cause StackOverFlowException in the future.
|
||||
public static final int UNION_ALL_LIMIT = 512;
|
||||
|
||||
public static final String FULL_AUTO_ANALYZE_START_TIME = "00:00:00";
|
||||
public static final String FULL_AUTO_ANALYZE_END_TIME = "23:59:59";
|
||||
|
||||
public static final int INSERT_MERGE_ITEM_COUNT = 200;
|
||||
|
||||
public static final long HUGE_TABLE_DEFAULT_SAMPLE_ROWS = 4194304;
|
||||
public static final long HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES = 5L * 1024 * 1024 * 1024;
|
||||
|
||||
public static final long HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS = TimeUnit.HOURS.toMillis(12);
|
||||
|
||||
public static final int TABLE_STATS_HEALTH_THRESHOLD = 60;
|
||||
|
||||
public static final int ANALYZE_TIMEOUT_IN_SEC = 43200;
|
||||
|
||||
static {
|
||||
SYSTEM_DBS.add(SystemInfoService.DEFAULT_CLUSTER
|
||||
+ ClusterNamespace.CLUSTER_DELIMITER + FeConstants.INTERNAL_DB_NAME);
|
||||
|
||||
@ -113,7 +113,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
if (!(table instanceof OlapTable || table instanceof ExternalTable)) {
|
||||
return true;
|
||||
}
|
||||
if (table.getDataSize(true) < Config.huge_table_lower_bound_size_in_bytes) {
|
||||
if (table.getDataSize(true) < StatisticsUtil.getHugeTableLowerBoundSizeInBytes() * 5) {
|
||||
return false;
|
||||
}
|
||||
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(table.getId());
|
||||
@ -121,12 +121,13 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
if (tableStats == null) {
|
||||
return false;
|
||||
}
|
||||
return System.currentTimeMillis() - tableStats.updatedTime < Config.huge_table_auto_analyze_interval_in_millis;
|
||||
return System.currentTimeMillis()
|
||||
- tableStats.updatedTime < StatisticsUtil.getHugeTableAutoAnalyzeIntervalInMillis();
|
||||
}
|
||||
|
||||
protected void createAnalyzeJobForTbl(DatabaseIf<? extends TableIf> db,
|
||||
List<AnalysisInfo> analysisInfos, TableIf table) {
|
||||
AnalysisMethod analysisMethod = table.getDataSize(true) > Config.huge_table_lower_bound_size_in_bytes
|
||||
AnalysisMethod analysisMethod = table.getDataSize(true) > StatisticsUtil.getHugeTableLowerBoundSizeInBytes()
|
||||
? AnalysisMethod.SAMPLE : AnalysisMethod.FULL;
|
||||
AnalysisInfo jobInfo = new AnalysisInfoBuilder()
|
||||
.setJobId(Env.getCurrentEnv().getNextId())
|
||||
@ -141,7 +142,7 @@ public class StatisticsAutoCollector extends StatisticsCollector {
|
||||
.setAnalysisType(AnalysisInfo.AnalysisType.FUNDAMENTALS)
|
||||
.setAnalysisMode(AnalysisInfo.AnalysisMode.INCREMENTAL)
|
||||
.setAnalysisMethod(analysisMethod)
|
||||
.setSampleRows(Config.huge_table_default_sample_rows)
|
||||
.setSampleRows(StatisticsUtil.getHugeTableSampleRows())
|
||||
.setScheduleType(ScheduleType.AUTOMATIC)
|
||||
.setState(AnalysisState.PENDING)
|
||||
.setTaskIds(new ArrayList<>())
|
||||
|
||||
@ -73,14 +73,15 @@ public abstract class StatisticsCollector extends MasterDaemon {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<Long, BaseAnalysisTask> analysisTaskInfos = new HashMap<>();
|
||||
Map<Long, BaseAnalysisTask> analysisTasks = new HashMap<>();
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
analysisManager.createTaskForEachColumns(jobInfo, analysisTaskInfos, false);
|
||||
analysisManager.createTaskForEachColumns(jobInfo, analysisTasks, false);
|
||||
Env.getCurrentEnv().getAnalysisManager().constructJob(jobInfo, analysisTasks.values());
|
||||
if (StatisticsUtil.isExternalTable(jobInfo.catalogId, jobInfo.dbId, jobInfo.tblId)) {
|
||||
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTaskInfos, false);
|
||||
analysisManager.createTableLevelTaskForExternalTable(jobInfo, analysisTasks, false);
|
||||
}
|
||||
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTaskInfos);
|
||||
analysisTaskInfos.values().forEach(analysisTaskExecutor::submitTask);
|
||||
Env.getCurrentEnv().getAnalysisManager().registerSysJob(jobInfo, analysisTasks);
|
||||
analysisTasks.values().forEach(analysisTaskExecutor::submitTask);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -1,50 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class StatisticsPeriodCollector extends StatisticsCollector {
|
||||
private static final Logger LOG = LogManager.getLogger(StatisticsPeriodCollector.class);
|
||||
|
||||
public StatisticsPeriodCollector() {
|
||||
super("Automatic Analyzer",
|
||||
TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes) / 2,
|
||||
new AnalysisTaskExecutor(Config.period_analyze_simultaneously_running_task_num));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void collect() {
|
||||
try {
|
||||
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
|
||||
List<AnalysisInfo> jobInfos = analysisManager.findPeriodicJobs();
|
||||
for (AnalysisInfo jobInfo : jobInfos) {
|
||||
createSystemAnalysisJob(jobInfo);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to periodically analyze the statistics." + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -19,6 +19,8 @@ package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.statistics.util.StatisticsUtil;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.StringJoiner;
|
||||
|
||||
public class StatsId {
|
||||
@ -34,6 +36,17 @@ public class StatsId {
|
||||
// nullable
|
||||
public final String partId;
|
||||
|
||||
@VisibleForTesting
|
||||
public StatsId() {
|
||||
this.id = null;
|
||||
this.catalogId = -1;
|
||||
this.dbId = -1;
|
||||
this.tblId = -1;
|
||||
this.idxId = -1;
|
||||
this.colId = null;
|
||||
this.partId = null;
|
||||
}
|
||||
|
||||
public StatsId(ResultRow row) {
|
||||
this.id = row.get(0);
|
||||
this.catalogId = Long.parseLong(row.get(1));
|
||||
@ -52,7 +65,7 @@ public class StatsId {
|
||||
sj.add(String.valueOf(tblId));
|
||||
sj.add(String.valueOf(idxId));
|
||||
sj.add(StatisticsUtil.quote(colId));
|
||||
sj.add(StatisticsUtil.quote(partId));
|
||||
sj.add(partId);
|
||||
return sj.toString();
|
||||
}
|
||||
}
|
||||
|
||||
@ -177,12 +177,14 @@ public class StatisticsUtil {
|
||||
sessionVariable.enablePageCache = false;
|
||||
sessionVariable.parallelExecInstanceNum = Config.statistics_sql_parallel_exec_instance_num;
|
||||
sessionVariable.parallelPipelineTaskNum = Config.statistics_sql_parallel_exec_instance_num;
|
||||
sessionVariable.setEnableNereidsPlanner(false);
|
||||
sessionVariable.setEnableNereidsPlanner(true);
|
||||
sessionVariable.setEnablePipelineEngine(false);
|
||||
sessionVariable.enableProfile = false;
|
||||
sessionVariable.enableScanRunSerial = limitScan;
|
||||
sessionVariable.queryTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
|
||||
sessionVariable.insertTimeoutS = Config.analyze_task_timeout_in_hours * 60 * 60;
|
||||
sessionVariable.queryTimeoutS = StatisticsUtil.getAnalyzeTimeout();
|
||||
sessionVariable.insertTimeoutS = StatisticsUtil.getAnalyzeTimeout();
|
||||
sessionVariable.enableFileCache = false;
|
||||
sessionVariable.forbidUnknownColStats = false;
|
||||
connectContext.setEnv(Env.getCurrentEnv());
|
||||
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
|
||||
connectContext.setQualifiedUser(UserIdentity.ROOT.getQualifiedUser());
|
||||
@ -808,7 +810,7 @@ public class StatisticsUtil {
|
||||
|
||||
public static boolean inAnalyzeTime(LocalTime now) {
|
||||
try {
|
||||
Pair<LocalTime, LocalTime> range = findRangeFromGlobalSessionVar();
|
||||
Pair<LocalTime, LocalTime> range = findConfigFromGlobalSessionVar();
|
||||
if (range == null) {
|
||||
return false;
|
||||
}
|
||||
@ -825,16 +827,16 @@ public class StatisticsUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static Pair<LocalTime, LocalTime> findRangeFromGlobalSessionVar() {
|
||||
private static Pair<LocalTime, LocalTime> findConfigFromGlobalSessionVar() {
|
||||
try {
|
||||
String startTime =
|
||||
findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
|
||||
findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_START_TIME)
|
||||
.fullAutoAnalyzeStartTime;
|
||||
// For compatibility
|
||||
if (StringUtils.isEmpty(startTime)) {
|
||||
startTime = StatisticConstants.FULL_AUTO_ANALYZE_START_TIME;
|
||||
}
|
||||
String endTime = findRangeFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
|
||||
String endTime = findConfigFromGlobalSessionVar(SessionVariable.FULL_AUTO_ANALYZE_END_TIME)
|
||||
.fullAutoAnalyzeEndTime;
|
||||
if (StringUtils.isEmpty(startTime)) {
|
||||
endTime = StatisticConstants.FULL_AUTO_ANALYZE_END_TIME;
|
||||
@ -846,7 +848,7 @@ public class StatisticsUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static SessionVariable findRangeFromGlobalSessionVar(String varName) throws Exception {
|
||||
protected static SessionVariable findConfigFromGlobalSessionVar(String varName) throws Exception {
|
||||
SessionVariable sessionVariable = VariableMgr.newSessionVariable();
|
||||
VariableExpr variableExpr = new VariableExpr(varName, SetType.GLOBAL);
|
||||
VariableMgr.getValue(sessionVariable, variableExpr);
|
||||
@ -855,10 +857,71 @@ public class StatisticsUtil {
|
||||
|
||||
public static boolean enableAutoAnalyze() {
|
||||
try {
|
||||
return findRangeFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.ENABLE_FULL_AUTO_ANALYZE).enableFullAutoAnalyze;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Fail to get value of enable auto analyze, return false by default", e);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public static int getInsertMergeCount() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.STATS_INSERT_MERGE_ITEM_COUNT)
|
||||
.statsInsertMergeItemCount;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of insert_merge_item_count, return default", e);
|
||||
}
|
||||
return StatisticConstants.INSERT_MERGE_ITEM_COUNT;
|
||||
}
|
||||
|
||||
public static long getHugeTableSampleRows() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_DEFAULT_SAMPLE_ROWS)
|
||||
.hugeTableDefaultSampleRows;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of huge_table_default_sample_rows, return default", e);
|
||||
}
|
||||
return StatisticConstants.HUGE_TABLE_DEFAULT_SAMPLE_ROWS;
|
||||
}
|
||||
|
||||
public static long getHugeTableLowerBoundSizeInBytes() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES)
|
||||
.hugeTableLowerBoundSizeInBytes;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of huge_table_lower_bound_size_in_bytes, return default", e);
|
||||
}
|
||||
return StatisticConstants.HUGE_TABLE_LOWER_BOUND_SIZE_IN_BYTES;
|
||||
}
|
||||
|
||||
public static long getHugeTableAutoAnalyzeIntervalInMillis() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS)
|
||||
.hugeTableAutoAnalyzeIntervalInMillis;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of huge_table_auto_analyze_interval_in_millis, return default", e);
|
||||
}
|
||||
return StatisticConstants.HUGE_TABLE_AUTO_ANALYZE_INTERVAL_IN_MILLIS;
|
||||
}
|
||||
|
||||
public static long getTableStatsHealthThreshold() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.TABLE_STATS_HEALTH_THRESHOLD)
|
||||
.tableStatsHealthThreshold;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of table_stats_health_threshold, return default", e);
|
||||
}
|
||||
return StatisticConstants.TABLE_STATS_HEALTH_THRESHOLD;
|
||||
}
|
||||
|
||||
public static int getAnalyzeTimeout() {
|
||||
try {
|
||||
return findConfigFromGlobalSessionVar(SessionVariable.ANALYZE_TIMEOUT)
|
||||
.analyzeTimeoutS;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to get value of table_stats_health_threshold, return default", e);
|
||||
}
|
||||
return StatisticConstants.ANALYZE_TIMEOUT_IN_SEC;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user