diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 6f6813c2cf..254b206b1e 100755 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -821,7 +821,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, } fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true); - fragments_ctx->timeout_second = params.query_options.query_timeout; + fragments_ctx->timeout_second = params.query_options.execution_timeout; _set_scan_concurrency(params, fragments_ctx.get()); bool has_query_mem_tracker = @@ -1211,7 +1211,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, exec_fragment_params.__set_params(fragment_exec_params); TQueryOptions query_options; query_options.batch_size = params.batch_size; - query_options.query_timeout = params.query_timeout; + query_options.execution_timeout = params.execution_timeout; query_options.mem_limit = params.mem_limit; query_options.query_type = TQueryType::EXTERNAL; exec_fragment_params.__set_query_options(query_options); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index 7ded1c66fc..f5e118e381 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -90,8 +90,6 @@ public: } int query_parallel_instance_num() const { return _query_options.parallel_instance; } int max_errors() const { return _query_options.max_errors; } - int query_timeout() const { return _query_options.query_timeout; } - int insert_timeout() const { return _query_options.insert_timeout; } int execution_timeout() const { return _query_options.execution_timeout; } int max_io_buffers() const { return _query_options.max_io_buffers; } int num_scanner_threads() const { return _query_options.num_scanner_threads; } diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp b/be/src/vec/exec/scan/vmeta_scanner.cpp index 50877d5558..c63d6a5dc2 100644 --- a/be/src/vec/exec/scan/vmeta_scanner.cpp +++ b/be/src/vec/exec/scan/vmeta_scanner.cpp @@ -165,8 +165,8 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) { return Status::OK(); } - // _state->query_timeout() is seconds, change to milliseconds - int time_out = _state->query_timeout() * 1000; + // _state->execution_timeout() is seconds, change to milliseconds + int time_out = _state->execution_timeout() * 1000; TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; TFetchSchemaTableDataResult result; RETURN_IF_ERROR(ThriftRpcHelper::rpc( diff --git a/docs/en/docs/advanced/variables.md b/docs/en/docs/advanced/variables.md index a1eacd4ddf..9af59255da 100644 --- a/docs/en/docs/advanced/variables.md +++ b/docs/en/docs/advanced/variables.md @@ -616,3 +616,29 @@ Translated with www.DeepL.com/Translator (free version) | 10000000 | +--------------+ ``` + +*** + +#### Supplementary instructions on statement execution timeout control + +* Means of control + + Currently doris supports timeout control through `variable` and `user property` two systems. Both include `qeury_timeout` and `insert_timeout`. + +* Priority + + The order of priority for timeout to take effect is: `session variable` > `user property` > `global variable` > `default value` + + When a variable with a higher priority is not set, the value of the next priority is automatically adopted. + +* Related semantics + + `query_timeout` is used to control the timeout of all statements, and `insert_timeout` is specifically used to control the timeout of the INSERT statement. When the INSERT statement is executed, the timeout time will take + + The maximum value of `query_timeout` and `insert_timeout`. + + `query_timeout` and `insert_timeout` in `user property` can only be specified by the ADMIN user for the target user, and its semantics is to change the default timeout time of the specified user, and it does not have `quota` semantics. + +* Precautions + + The timeout set by `user property` needs to be triggered after the client reconnects. \ No newline at end of file diff --git a/docs/zh-CN/docs/advanced/variables.md b/docs/zh-CN/docs/advanced/variables.md index 36da2a3793..2913435a73 100644 --- a/docs/zh-CN/docs/advanced/variables.md +++ b/docs/zh-CN/docs/advanced/variables.md @@ -602,3 +602,30 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3); | 10000000 | +--------------+ ``` +*** + +#### 关于语句执行超时控制的补充说明 + +* 控制手段 + + 目前doris支持通过`variable`和`user property`两种体系来进行超时控制。其中均包含`qeury_timeout`和`insert_timeout`。 + +* 优先次序 + + 超时生效的优先级次序是:`session variable` > `user property` > `global variable` > `default value` + + 较高优先级的变量未设置时,会自动采用下一个优先级的数值。 + +* 相关语义 + + `query_timeout`用于控制所有语句的超时,`insert_timeout`特定用于控制 INSERT 语句的超时,在执行 INSERT 语句时,超时时间会取 + + `query_timeout`和`insert_timeout`中的最大值。 + + `user property`中的`query_timeout`和`insert_timeout`只能由 ADMIN 用户对目标用户予以指定,其语义在于改变被指定用户的默认超时时间, + + 并且不具备`quota`语义。 + +* 注意事项 + + `user property`设置的超时时间需要客户端重连后触发。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 73f9da6ffd..98cb6354bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -305,7 +305,7 @@ public class InsertStmt extends DdlStmt { db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); // create label and begin transaction - long timeoutSecond = ConnectContext.get().resetExecTimeoutByInsert(); + long timeoutSecond = ConnectContext.get().getExecTimeout(); if (Strings.isNullOrEmpty(label)) { label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_"); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java index e57051cb1a..073191bd51 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/MysqlLoadManager.java @@ -20,6 +20,8 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.SetVar; +import org.apache.doris.analysis.StringLiteral; import org.apache.doris.catalog.Env; import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.Config; @@ -30,6 +32,8 @@ import org.apache.doris.common.io.ByteBufferNetworkInputStream; import org.apache.doris.load.LoadJobRowResult; import org.apache.doris.mysql.MysqlSerializer; import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.VariableMgr; import org.apache.doris.system.Backend; import org.apache.doris.system.BeSelectionPolicy; import org.apache.doris.system.SystemInfoService; @@ -120,8 +124,11 @@ public class MysqlLoadManager { int oldTimeout = context.getExecTimeout(); int newTimeOut = extractTimeOut(dataDesc); if (newTimeOut > oldTimeout) { - // set exec timeout avoid by killed TimeoutChecker - context.setExecTimeout(newTimeOut); + // set query timeout avoid by killed TimeoutChecker + SessionVariable sessionVariable = context.getSessionVariable(); + sessionVariable.setIsSingleSetVar(true); + VariableMgr.setVar(sessionVariable, + new SetVar(SessionVariable.QUERY_TIMEOUT, new StringLiteral(String.valueOf(newTimeOut)))); } String token = tokenManager.acquireToken(); LOG.info("execute MySqlLoadJob for id: {}.", loadId); @@ -163,10 +170,6 @@ public class MysqlLoadManager { } } finally { loadContextMap.remove(loadId); - // revert the exec timeout - if (newTimeOut > oldTimeout) { - context.setExecTimeout(oldTimeout); - } } return loadResult; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java index 4f059c7c75..1bde95c165 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/AcceptListener.java @@ -79,6 +79,8 @@ public class AcceptListener implements ChannelListener resourceTags = this.commonProperties.getResourceTags(); long execMemLimit = this.commonProperties.getExecMemLimit(); - long queryTimeout = this.commonProperties.getQueryTimeout(); + int queryTimeout = this.commonProperties.getQueryTimeout(); + int insertTimeout = this.commonProperties.getInsertTimeout(); UserResource newResource = resource.getCopiedUserResource(); String newDefaultLoadCluster = defaultLoadCluster; @@ -311,13 +319,22 @@ public class UserProperty implements Writable { execMemLimit = getLongProperty(key, value, keyArr, PROP_EXEC_MEM_LIMIT); } else if (keyArr[0].equalsIgnoreCase(PROP_USER_QUERY_TIMEOUT)) { if (keyArr.length != 1) { - throw new DdlException(PROP_MAX_USER_CONNECTIONS + " format error"); + throw new DdlException(PROP_USER_QUERY_TIMEOUT + " format error"); } try { - queryTimeout = Long.parseLong(value); + queryTimeout = Integer.parseInt(value); } catch (NumberFormatException e) { throw new DdlException(PROP_USER_QUERY_TIMEOUT + " is not number"); } + } else if (keyArr[0].equalsIgnoreCase(PROP_USER_INSERT_TIMEOUT)) { + if (keyArr.length != 1) { + throw new DdlException(PROP_USER_INSERT_TIMEOUT + " format error"); + } + try { + insertTimeout = Integer.parseInt(value); + } catch (NumberFormatException e) { + throw new DdlException(PROP_USER_INSERT_TIMEOUT + " is not number"); + } } else { throw new DdlException("Unknown user property(" + key + ")"); } @@ -331,6 +348,7 @@ public class UserProperty implements Writable { this.commonProperties.setResourceTags(resourceTags); this.commonProperties.setExecMemLimit(execMemLimit); this.commonProperties.setQueryTimeout(queryTimeout); + this.commonProperties.setInsertTimeout(insertTimeout); resource = newResource; if (newDppConfigs.containsKey(newDefaultLoadCluster)) { defaultLoadCluster = newDefaultLoadCluster; @@ -460,6 +478,9 @@ public class UserProperty implements Writable { // query timeout result.add(Lists.newArrayList(PROP_USER_QUERY_TIMEOUT, String.valueOf(commonProperties.getQueryTimeout()))); + // insert timeout + result.add(Lists.newArrayList(PROP_USER_INSERT_TIMEOUT, String.valueOf(commonProperties.getInsertTimeout()))); + // resource tag result.add(Lists.newArrayList(PROP_RESOURCE_TAGS, Joiner.on(", ").join(commonProperties.getResourceTags()))); diff --git a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java index 79c38b5bb0..7d3e506baa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/mysql/privilege/UserPropertyMgr.java @@ -104,7 +104,7 @@ public class UserPropertyMgr implements Writable { property.update(properties); } - public long getQueryTimeout(String qualifiedUser) { + public int getQueryTimeout(String qualifiedUser) { UserProperty existProperty = propertyMap.get(qualifiedUser); existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); if (existProperty == null) { @@ -113,6 +113,15 @@ public class UserPropertyMgr implements Writable { return existProperty.getQueryTimeout(); } + public int getInsertTimeout(String qualifiedUser) { + UserProperty existProperty = propertyMap.get(qualifiedUser); + existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); + if (existProperty == null) { + return 0; + } + return existProperty.getInsertTimeout(); + } + public long getMaxConn(String qualifiedUser) { UserProperty existProperty = propertyMap.get(qualifiedUser); existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); @@ -201,7 +210,7 @@ public class UserPropertyMgr implements Writable { UserProperty existProperty = propertyMap.get(qualifiedUser); existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty); if (existProperty == null) { - return new String[]{}; + return new String[] {}; } return existProperty.getSqlBlockRules(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 0b4783dab9..66b4a496e4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -155,21 +155,18 @@ public class ConnectContext { // This context is used for SSL connection between server and mysql client. private final MysqlSslContext mysqlSslContext = new MysqlSslContext(SSL_PROTOCOL); - private long userQueryTimeout; - - /** - * the global execution timeout in seconds, currently set according to query_timeout and insert_timeout. - *

- * when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt, - * then it is set to max(executionTimeoutS, insert_timeout) using {@link #setExecTimeout(int timeout)} at - * {@link StmtExecutor}. - */ - private int executionTimeoutS; - private StatsErrorEstimator statsErrorEstimator; - public void setUserQueryTimeout(long queryTimeout) { - this.userQueryTimeout = queryTimeout; + public void setUserQueryTimeout(int queryTimeout) { + if (queryTimeout > 0) { + sessionVariable.setQueryTimeoutS(queryTimeout); + } + } + + public void setUserInsertTimeout(int insertTimeout) { + if (insertTimeout > 0) { + sessionVariable.setInsertTimeoutS(insertTimeout); + } } private StatementContext statementContext; @@ -239,8 +236,6 @@ public class ConnectContext { if (Config.use_fuzzy_session_variable) { sessionVariable.initFuzzyModeVariables(); } - // initialize executionTimeoutS to default to queryTimeout - executionTimeoutS = sessionVariable.getQueryTimeoutS(); } public boolean isTxnModel() { @@ -596,20 +591,13 @@ public class ConnectContext { killConnection = true; } } else { - long timeout; String timeoutTag = "query"; - if (userQueryTimeout > 0) { - // user set query_timeout property - timeout = userQueryTimeout * 1000L; - } else { - //to ms - timeout = executionTimeoutS * 1000L; - } - //deal with insert stmt particularly + // insert stmt particularly if (executor != null && executor.isInsertStmt()) { timeoutTag = "insert"; } - + //to ms + long timeout = getExecTimeout() * 1000L; if (delta > timeout) { LOG.warn("kill {} timeout, remote: {}, query timeout: {}", timeoutTag, getMysqlChannel().getRemoteHostPortString(), timeout); @@ -652,17 +640,20 @@ public class ConnectContext { return currentConnectedFEIp; } - public void setExecTimeout(int timeout) { - executionTimeoutS = timeout; - } - - public long resetExecTimeoutByInsert() { - executionTimeoutS = Math.max(executionTimeoutS, sessionVariable.getInsertTimeoutS()); - return executionTimeoutS; - } - + /** + * We calculate and get the exact execution timeout here, rather than setting + * execution timeout in many other places. + * + * @return exact execution timeout + */ public int getExecTimeout() { - return executionTimeoutS; + if (executor != null && executor.isInsertStmt()) { + // particular for insert stmt, we can expand other type of timeout in the same way + return Math.max(sessionVariable.getInsertTimeoutS(), sessionVariable.getQueryTimeoutS()); + } else { + // normal query stmt + return sessionVariable.getQueryTimeoutS(); + } } public class ThreadInfo { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 871b71d2dd..1be10db9ba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1502,8 +1502,8 @@ public class StmtExecutor implements ProfileWriter { InterruptedException, ExecutionException, TimeoutException { TransactionEntry txnEntry = context.getTxnEntry(); TTxnParams txnConf = txnEntry.getTxnConf(); - SessionVariable sessionVariable = ConnectContext.get().getSessionVariable(); - long timeoutSecond = ConnectContext.get().getExecTimeout(); + SessionVariable sessionVariable = context.getSessionVariable(); + long timeoutSecond = context.getExecTimeout(); TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING; Database dbObj = Env.getCurrentInternalCatalog() @@ -1572,9 +1572,6 @@ public class StmtExecutor implements ProfileWriter { } analyzeVariablesInStmt(insertStmt.getQueryStmt()); - // reset the executionTimeout since query hint maybe change the insert_timeout again - context.resetExecTimeoutByInsert(); - long createTime = System.currentTimeMillis(); Throwable throwable = null; long txnId = -1; diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java index 1b7b91c491..cde2440a20 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java @@ -279,7 +279,7 @@ public class ResourceTagQueryTest { Assert.assertEquals(1000000, execMemLimit); List> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER); - Assert.assertEquals(17, userProps.size()); + Assert.assertEquals(18, userProps.size()); } private void checkTableReplicaAllocation(OlapTable tbl) throws InterruptedException { diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java index 68e4394f67..3b9bb05fcb 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectContextTest.java @@ -139,21 +139,20 @@ public class ConnectContextTest { // sleep no time out ctx.setStartTime(); Assert.assertFalse(ctx.isKilled()); - long now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000 - 1; + long now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000L - 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); // Timeout ctx.setStartTime(); - now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000 + 1; + now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000L + 1; ctx.setExecutor(executor); ctx.checkTimeout(now); Assert.assertTrue(ctx.isKilled()); // user query timeout ctx.setStartTime(); - ctx.setUserQueryTimeout(1); - now = ctx.getStartTime() + auth.getQueryTimeout(qualifiedUser) * 1000 + 1; + now = ctx.getStartTime() + auth.getQueryTimeout(qualifiedUser) * 1000L + 1; ctx.setExecutor(executor); ctx.checkTimeout(now); Assert.assertTrue(ctx.isKilled()); @@ -176,14 +175,12 @@ public class ConnectContextTest { // sleep no time out Assert.assertFalse(ctx.isKilled()); ctx.setExecutor(executor); - ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS()); long now = ctx.getExecTimeout() * 1000L - 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); // Timeout ctx.setExecutor(executor); - ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS()); now = ctx.getExecTimeout() * 1000L + 1; ctx.checkTimeout(now); Assert.assertFalse(ctx.isKilled()); diff --git a/gensrc/thrift/DorisExternalService.thrift b/gensrc/thrift/DorisExternalService.thrift index 483df19670..8a3dc8c55f 100644 --- a/gensrc/thrift/DorisExternalService.thrift +++ b/gensrc/thrift/DorisExternalService.thrift @@ -55,7 +55,7 @@ struct TScanOpenParams { // max keep alive time min 11: optional i16 keep_alive_min - 12: optional i32 query_timeout + 12: optional i32 execution_timeout // memory limit for a single query 13: optional i64 mem_limit