[fix](MTMV) Reset insert timeout in handleInsert (#17249)
In #16343, we split the timeout variable into two ones (one is for query and another is for insertion). The function `ConnectProcessor::handleQuery` uses the corresponding session variable to change the timeout for the queries requested by MySQL client. However, the function `StmtExecutor::handleInsert` doesn't use the session variable to change the timeout, so we can't change the timeout for the CTAS and MTMV insertion job.
This commit is contained in:
@ -305,7 +305,7 @@ public class InsertStmt extends DdlStmt {
|
||||
|
||||
db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
|
||||
// create label and begin transaction
|
||||
long timeoutSecond = ConnectContext.get().getExecTimeout();
|
||||
long timeoutSecond = ConnectContext.get().resetExecTimeoutByInsert();
|
||||
if (Strings.isNullOrEmpty(label)) {
|
||||
label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
|
||||
}
|
||||
|
||||
@ -155,8 +155,8 @@ public class ConnectContext {
|
||||
* the global execution timeout in seconds, currently set according to query_timeout and insert_timeout.
|
||||
* <p>
|
||||
* when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt,
|
||||
* then it is set to max(query_timeout, insert_timeout) with {@link #resetExecTimeout()} in
|
||||
* after the StmtExecutor is specified.
|
||||
* then it is set to max(executionTimeoutS, insert_timeout) using {@link #setExecTimeout(int timeout)} at
|
||||
* {@link StmtExecutor}.
|
||||
*/
|
||||
private int executionTimeoutS;
|
||||
|
||||
@ -640,12 +640,13 @@ public class ConnectContext {
|
||||
return currentConnectedFEIp;
|
||||
}
|
||||
|
||||
public void resetExecTimeout() {
|
||||
if (executor != null && executor.isInsertStmt()) {
|
||||
// particular timeout for insert stmt, we can make other particular timeout in the same way.
|
||||
// set the execution timeout as max(insert_timeout,query_timeout) to be compatible with older versions
|
||||
executionTimeoutS = Math.max(sessionVariable.getInsertTimeoutS(), executionTimeoutS);
|
||||
}
|
||||
public void setExecTimeout(int timeout) {
|
||||
executionTimeoutS = timeout;
|
||||
}
|
||||
|
||||
public long resetExecTimeoutByInsert() {
|
||||
executionTimeoutS = Math.max(executionTimeoutS, sessionVariable.getInsertTimeoutS());
|
||||
return executionTimeoutS;
|
||||
}
|
||||
|
||||
public int getExecTimeout() {
|
||||
|
||||
@ -408,8 +408,6 @@ public class ConnectProcessor {
|
||||
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
|
||||
executor = new StmtExecutor(ctx, parsedStmt);
|
||||
ctx.setExecutor(executor);
|
||||
// reset the executionTimeout corresponding with the StmtExecutor
|
||||
ctx.resetExecTimeout();
|
||||
|
||||
try {
|
||||
executor.execute();
|
||||
|
||||
@ -1489,7 +1489,7 @@ public class StmtExecutor implements ProfileWriter {
|
||||
InterruptedException, ExecutionException, TimeoutException {
|
||||
TransactionEntry txnEntry = context.getTxnEntry();
|
||||
TTxnParams txnConf = txnEntry.getTxnConf();
|
||||
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
|
||||
long timeoutSecond = ConnectContext.get().getExecTimeout();
|
||||
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
|
||||
Database dbObj = Env.getCurrentInternalCatalog()
|
||||
.getDbOrException(dbName, s -> new TException("database is invalid for dbName: " + s));
|
||||
@ -1550,6 +1550,8 @@ public class StmtExecutor implements ProfileWriter {
|
||||
}
|
||||
|
||||
analyzeVariablesInStmt(insertStmt.getQueryStmt());
|
||||
// reset the executionTimeout since query hint maybe change the insert_timeout again
|
||||
context.resetExecTimeoutByInsert();
|
||||
|
||||
long createTime = System.currentTimeMillis();
|
||||
Throwable throwable = null;
|
||||
|
||||
@ -176,14 +176,14 @@ public class ConnectContextTest {
|
||||
// sleep no time out
|
||||
Assert.assertFalse(ctx.isKilled());
|
||||
ctx.setExecutor(executor);
|
||||
ctx.resetExecTimeout();
|
||||
ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
|
||||
long now = ctx.getExecTimeout() * 1000L - 1;
|
||||
ctx.checkTimeout(now);
|
||||
Assert.assertFalse(ctx.isKilled());
|
||||
|
||||
// Timeout
|
||||
ctx.setExecutor(executor);
|
||||
ctx.resetExecTimeout();
|
||||
ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
|
||||
now = ctx.getExecTimeout() * 1000L + 1;
|
||||
ctx.checkTimeout(now);
|
||||
Assert.assertFalse(ctx.isKilled());
|
||||
|
||||
Reference in New Issue
Block a user