[fix](stats) Fix jdbc timeout with multiple FE when execute analyze table (#21115)

SQL may forward to master to execute when connecting to follower node, the result should be set to `StmtExecutor#proxyResultSet`

Before this PR, in above scenario , submit analyze sql by  mysql client/jdbc whould return get malformed packet/ Communication failed.
This commit is contained in:
AKIRA
2023-06-25 16:49:36 +09:00
committed by GitHub
parent 76bdcf1d26
commit dd99468b8f
7 changed files with 38 additions and 34 deletions

View File

@ -25,7 +25,7 @@ import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import java.util.Map;
public class AnalyzeStmt extends DdlStmt {
public class AnalyzeStmt extends StatementBase {
protected AnalyzeProperties analyzeProperties;
@ -81,4 +81,9 @@ public class AnalyzeStmt extends DdlStmt {
public AnalyzeProperties getAnalyzeProperties() {
return analyzeProperties;
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_WITH_SYNC;
}
}

View File

@ -252,11 +252,6 @@ public class AnalyzeTblStmt extends AnalyzeStmt {
return table instanceof HMSExternalTable && table.getPartitionNames().size() > partNum;
}
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
private void checkAnalyzePriv(String dbName, String tblName) throws AnalysisException {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(ConnectContext.get(), dbName, tblName, PrivPredicate.SELECT)) {

View File

@ -28,7 +28,7 @@ public class RedirectStatus {
public RedirectStatus(boolean isForwardToMaster, boolean needToWaitJournalSync) {
this.isForwardToMaster = isForwardToMaster;
this.needToWaitJournalSync = needToWaitJournalSync;
this.needToWaitJournalSync = needToWaitJournalSync;
}
public boolean isForwardToMaster() {
@ -47,7 +47,7 @@ public class RedirectStatus {
this.needToWaitJournalSync = needToWaitJournalSync;
}
public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true, false);
public static RedirectStatus FORWARD_NO_SYNC = new RedirectStatus(true, false);
public static RedirectStatus FORWARD_WITH_SYNC = new RedirectStatus(true, true);
public static RedirectStatus NO_FORWARD = new RedirectStatus(false, false);
}

View File

@ -39,7 +39,6 @@ import org.apache.doris.analysis.AlterMaterializedViewStmt;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
@ -211,7 +210,6 @@ import org.apache.doris.resource.Tag;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.AnalysisTaskScheduler;
import org.apache.doris.statistics.StatisticsAutoAnalyzer;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
@ -5345,18 +5343,6 @@ public class Env {
return count;
}
public AnalysisTaskScheduler getAnalysisJobScheduler() {
return analysisManager.taskScheduler;
}
// TODO:
// 1. handle partition level analysis statement properly
// 2. support sample job
// 3. support period job
public void createAnalysisJob(AnalyzeTblStmt analyzeTblStmt) throws DdlException {
analysisManager.createAnalysisJob(analyzeTblStmt);
}
public AnalysisManager getAnalysisManager() {
return analysisManager;
}

View File

@ -43,8 +43,6 @@ import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterUserStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.AlterWorkloadGroupStmt;
import org.apache.doris.analysis.AnalyzeDBStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
@ -297,8 +295,6 @@ public class DdlExecutor {
env.getRefreshManager().handleRefreshTable((RefreshTableStmt) ddlStmt);
} else if (ddlStmt instanceof RefreshDbStmt) {
env.getRefreshManager().handleRefreshDb((RefreshDbStmt) ddlStmt);
} else if (ddlStmt instanceof AnalyzeTblStmt) {
env.createAnalysisJob((AnalyzeTblStmt) ddlStmt);
} else if (ddlStmt instanceof AlterResourceStmt) {
env.getResourceMgr().alterResource((AlterResourceStmt) ddlStmt);
} else if (ddlStmt instanceof AlterWorkloadGroupStmt) {
@ -337,8 +333,6 @@ public class DdlExecutor {
env.getAnalysisManager().dropStats((DropStatsStmt) ddlStmt);
} else if (ddlStmt instanceof KillAnalysisJobStmt) {
env.getAnalysisManager().handleKillAnalyzeStmt((KillAnalysisJobStmt) ddlStmt);
} else if (ddlStmt instanceof AnalyzeDBStmt) {
env.getAnalysisManager().createAnalysisJobs((AnalyzeDBStmt) ddlStmt);
} else if (ddlStmt instanceof CleanQueryStatsStmt) {
CleanQueryStatsStmt stmt = (CleanQueryStatsStmt) ddlStmt;
CleanQueryStatsInfo cleanQueryStatsInfo = null;

View File

@ -736,6 +736,8 @@ public class StmtExecutor {
handleLockTablesStmt();
} else if (parsedStmt instanceof UnsupportedStmt) {
handleUnsupportedStmt();
} else if (parsedStmt instanceof AnalyzeStmt) {
handleAnalyzeStmt();
} else {
context.getState().setError(ErrorCode.ERR_NOT_SUPPORTED_YET, "Do not support this query.");
}
@ -1899,6 +1901,10 @@ public class StmtExecutor {
context.getState().setOk();
}
private void handleAnalyzeStmt() throws DdlException {
context.env.getAnalysisManager().createAnalyze((AnalyzeStmt) parsedStmt, isProxy);
}
// Process switch catalog
private void handleSwitchStmt() throws AnalysisException {
SwitchStmt switchStmt = (SwitchStmt) parsedStmt;
@ -2537,5 +2543,10 @@ public class StmtExecutor {
public void setProfileType(ProfileType profileType) {
this.profileType = profileType;
}
public void setProxyResultSet(ShowResultSet proxyResultSet) {
this.proxyResultSet = proxyResultSet;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.doris.statistics;
import org.apache.doris.analysis.AnalyzeDBStmt;
import org.apache.doris.analysis.AnalyzeStmt;
import org.apache.doris.analysis.AnalyzeTblStmt;
import org.apache.doris.analysis.DropAnalyzeJobStmt;
import org.apache.doris.analysis.DropStatsStmt;
@ -151,7 +152,15 @@ public class AnalysisManager extends Daemon implements Writable {
return statisticsCache;
}
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt) throws DdlException {
public void createAnalyze(AnalyzeStmt analyzeStmt, boolean proxy) throws DdlException {
if (analyzeStmt instanceof AnalyzeDBStmt) {
createAnalysisJobs((AnalyzeDBStmt) analyzeStmt, proxy);
} else if (analyzeStmt instanceof AnalyzeTblStmt) {
createAnalysisJob((AnalyzeTblStmt) analyzeStmt, proxy);
}
}
public void createAnalysisJobs(AnalyzeDBStmt analyzeDBStmt, boolean proxy) throws DdlException {
DatabaseIf<TableIf> db = analyzeDBStmt.getDb();
List<TableIf> tbls = db.getTables();
List<AnalysisInfo> analysisInfos = new ArrayList<>();
@ -179,7 +188,7 @@ public class AnalysisManager extends Daemon implements Writable {
analysisInfos.add(buildAndAssignJob(analyzeTblStmt));
}
if (!analyzeDBStmt.isSync()) {
sendJobId(analysisInfos);
sendJobId(analysisInfos, proxy);
}
} finally {
db.readUnlock();
@ -188,12 +197,12 @@ public class AnalysisManager extends Daemon implements Writable {
}
// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeTblStmt stmt) throws DdlException {
public void createAnalysisJob(AnalyzeTblStmt stmt, boolean proxy) throws DdlException {
AnalysisInfo jobInfo = buildAndAssignJob(stmt);
if (jobInfo == null) {
return;
}
sendJobId(ImmutableList.of(jobInfo));
sendJobId(ImmutableList.of(jobInfo), proxy);
}
@Nullable
@ -259,7 +268,7 @@ public class AnalysisManager extends Daemon implements Writable {
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}
private void sendJobId(List<AnalysisInfo> analysisInfos) {
private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
List<Column> columns = new ArrayList<>();
columns.add(new Column("Catalog_Name", ScalarType.createVarchar(1024)));
columns.add(new Column("DB_Name", ScalarType.createVarchar(1024)));
@ -279,7 +288,11 @@ public class AnalysisManager extends Daemon implements Writable {
}
ShowResultSet commonResultSet = new ShowResultSet(commonResultSetMetaData, resultRows);
try {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
if (!proxy) {
ConnectContext.get().getExecutor().sendResultSet(commonResultSet);
} else {
ConnectContext.get().getExecutor().setProxyResultSet(commonResultSet);
}
} catch (Throwable t) {
LOG.warn("Failed to send job id to user", t);
}