[minor](stats) Throw error when sync analyze failed (#27845)
This commit is contained in:
@ -119,7 +119,7 @@ public class AnalysisJob {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
// buf could be empty when nothing need to do, for example user submit an analysis task for table with no data
|
||||
// buf could be empty when nothing need to do,r for example user submit an analysis task for table with no data
|
||||
// change
|
||||
if (!buf.isEmpty()) {
|
||||
String insertStmt = "INSERT INTO " + StatisticConstants.FULL_QUALIFIED_STATS_TBL_NAME + " VALUES ";
|
||||
@ -128,28 +128,17 @@ public class AnalysisJob {
|
||||
values.add(data.toSQL(true));
|
||||
}
|
||||
insertStmt += values.toString();
|
||||
int retryTimes = 0;
|
||||
while (retryTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (killed) {
|
||||
return;
|
||||
}
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
break;
|
||||
} catch (Exception t) {
|
||||
LOG.warn("Failed to write buf: " + insertStmt, t);
|
||||
retryTimes++;
|
||||
if (retryTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
updateTaskState(AnalysisState.FAILED, t.getMessage());
|
||||
return;
|
||||
}
|
||||
}
|
||||
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
|
||||
stmtExecutor = new StmtExecutor(r.connectContext, insertStmt);
|
||||
executeWithExceptionOnFail(stmtExecutor);
|
||||
} catch (Exception t) {
|
||||
throw new RuntimeException("Failed to analyze: " + t.getMessage());
|
||||
}
|
||||
}
|
||||
updateTaskState(AnalysisState.FINISHED, "");
|
||||
syncLoadStats();
|
||||
queryFinished.clear();
|
||||
buf.clear();
|
||||
}
|
||||
|
||||
protected void executeWithExceptionOnFail(StmtExecutor stmtExecutor) throws Exception {
|
||||
|
||||
@ -181,7 +181,7 @@ public abstract class BaseAnalysisTask {
|
||||
|
||||
protected void executeWithRetry() {
|
||||
int retriedTimes = 0;
|
||||
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
while (retriedTimes < StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (killed) {
|
||||
break;
|
||||
}
|
||||
@ -193,7 +193,7 @@ public abstract class BaseAnalysisTask {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
LOG.warn("Failed to execute analysis task, retried times: {}", retriedTimes++, t);
|
||||
if (retriedTimes > StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
if (retriedTimes >= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
|
||||
job.taskFailed(this, t.getMessage());
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
|
||||
@ -75,11 +75,20 @@ public class StatisticsCleaner extends MasterDaemon {
|
||||
}
|
||||
|
||||
public synchronized void clear() {
|
||||
if (!init()) {
|
||||
return;
|
||||
try {
|
||||
if (!init()) {
|
||||
return;
|
||||
}
|
||||
clearStats(colStatsTbl);
|
||||
clearStats(histStatsTbl);
|
||||
} finally {
|
||||
colStatsTbl = null;
|
||||
histStatsTbl = null;
|
||||
idToCatalog = null;
|
||||
idToDb = null;
|
||||
idToTbl = null;
|
||||
idToMVIdx = null;
|
||||
}
|
||||
clearStats(colStatsTbl);
|
||||
clearStats(histStatsTbl);
|
||||
}
|
||||
|
||||
private void clearStats(OlapTable statsTbl) {
|
||||
|
||||
Reference in New Issue
Block a user