[improvement](stats) Add lifecycle hooks to AnalysisTask to make codes more clear (#22658)
This commit is contained in:
@ -909,7 +909,7 @@ public class AnalysisManager extends Daemon implements Writable {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
task.doExecute();
|
||||
task.execute();
|
||||
updateSyncTaskStatus(task, AnalysisState.FINISHED);
|
||||
} catch (Throwable t) {
|
||||
colNames.add(task.info.colName);
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.statistics;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
@ -33,8 +32,6 @@ import com.google.common.base.Preconditions;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public abstract class BaseAnalysisTask {
|
||||
@ -102,8 +99,6 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
protected StmtExecutor stmtExecutor;
|
||||
|
||||
protected Set<PrimitiveType> unsupportedType = new HashSet<>();
|
||||
|
||||
protected volatile boolean killed;
|
||||
|
||||
@VisibleForTesting
|
||||
@ -116,17 +111,7 @@ public abstract class BaseAnalysisTask {
|
||||
init(info);
|
||||
}
|
||||
|
||||
protected void initUnsupportedType() {
|
||||
unsupportedType.add(PrimitiveType.HLL);
|
||||
unsupportedType.add(PrimitiveType.BITMAP);
|
||||
unsupportedType.add(PrimitiveType.ARRAY);
|
||||
unsupportedType.add(PrimitiveType.MAP);
|
||||
unsupportedType.add(PrimitiveType.JSONB);
|
||||
unsupportedType.add(PrimitiveType.STRUCT);
|
||||
}
|
||||
|
||||
private void init(AnalysisInfo info) {
|
||||
initUnsupportedType();
|
||||
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
|
||||
if (catalog == null) {
|
||||
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
|
||||
@ -162,6 +147,16 @@ public abstract class BaseAnalysisTask {
|
||||
}
|
||||
|
||||
public void execute() {
|
||||
prepareExecution();
|
||||
executeWithRetry();
|
||||
afterExecution();
|
||||
}
|
||||
|
||||
protected void prepareExecution() {
|
||||
setTaskStateToRunning();
|
||||
}
|
||||
|
||||
protected void executeWithRetry() {
|
||||
int retriedTimes = 0;
|
||||
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (killed) {
|
||||
@ -182,6 +177,10 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
public abstract void doExecute() throws Exception;
|
||||
|
||||
protected void afterExecution() {
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
protected void setTaskStateToRunning() {
|
||||
Env.getCurrentEnv().getAnalysisManager()
|
||||
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
|
||||
@ -197,10 +196,6 @@ public abstract class BaseAnalysisTask {
|
||||
String.format("Job has been cancelled: %s", info.message), System.currentTimeMillis());
|
||||
}
|
||||
|
||||
public long getLastExecTime() {
|
||||
return info.lastExecTimeInMs;
|
||||
}
|
||||
|
||||
public long getJobId() {
|
||||
return info.jobId;
|
||||
}
|
||||
@ -213,10 +208,6 @@ public abstract class BaseAnalysisTask {
|
||||
return "COUNT(1) * " + column.getType().getSlotSize();
|
||||
}
|
||||
|
||||
private boolean isUnsupportedType(PrimitiveType type) {
|
||||
return unsupportedType.contains(type);
|
||||
}
|
||||
|
||||
protected String getSampleExpression() {
|
||||
if (info.analysisMethod == AnalysisMethod.FULL) {
|
||||
return "";
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
@ -106,7 +105,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
if (isTableLevelTask) {
|
||||
getTableStats();
|
||||
} else {
|
||||
@ -232,8 +230,6 @@ public class HMSAnalysisTask extends BaseAnalysisTask {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(sb.toString());
|
||||
executeInsertSql(sql);
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(
|
||||
catalog.getId(), db.getId(), tbl.getId(), -1, col.getName());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -59,7 +59,6 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
|
||||
@Override
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("histogramStatTbl", StatisticConstants.HISTOGRAM_TBL_NAME);
|
||||
@ -80,6 +79,11 @@ public class HistogramTask extends BaseAnalysisTask {
|
||||
Env.getCurrentEnv().getStatisticsCache().refreshHistogramSync(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// DO NOTHING
|
||||
}
|
||||
|
||||
private String getSampleRateFunction() {
|
||||
if (info.analysisMethod == AnalysisMethod.FULL) {
|
||||
return "0";
|
||||
|
||||
@ -87,7 +87,6 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
|
||||
@Override
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
for (Column column : meta.getSchema()) {
|
||||
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
|
||||
TableRef tableRef = selectOne.getTableRefs().get(0);
|
||||
@ -146,4 +145,9 @@ public class MVAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void afterExecution() {
|
||||
// DO NOTHING
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,7 +17,6 @@
|
||||
|
||||
package org.apache.doris.statistics;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.qe.AutoCloseConnectContext;
|
||||
@ -55,7 +54,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
}
|
||||
|
||||
public void doExecute() throws Exception {
|
||||
setTaskStateToRunning();
|
||||
Map<String, String> params = new HashMap<>();
|
||||
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
|
||||
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
|
||||
@ -93,7 +91,6 @@ public class OlapAnalysisTask extends BaseAnalysisTask {
|
||||
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
|
||||
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
|
||||
execSQL(sql);
|
||||
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
||||
Reference in New Issue
Block a user