[enhancement](timeout) fix set timeout failure and simplify timeout logic (#17837)

This commit is contained in:
奕冷
2023-03-25 21:56:06 +08:00
committed by GitHub
parent 193ae352e4
commit 855852d582
17 changed files with 162 additions and 71 deletions

View File

@ -821,7 +821,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
}
fragments_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(true);
fragments_ctx->timeout_second = params.query_options.query_timeout;
fragments_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, fragments_ctx.get());
bool has_query_mem_tracker =
@ -1211,7 +1211,7 @@ Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
exec_fragment_params.__set_params(fragment_exec_params);
TQueryOptions query_options;
query_options.batch_size = params.batch_size;
query_options.query_timeout = params.query_timeout;
query_options.execution_timeout = params.execution_timeout;
query_options.mem_limit = params.mem_limit;
query_options.query_type = TQueryType::EXTERNAL;
exec_fragment_params.__set_query_options(query_options);

View File

@ -90,8 +90,6 @@ public:
}
int query_parallel_instance_num() const { return _query_options.parallel_instance; }
int max_errors() const { return _query_options.max_errors; }
int query_timeout() const { return _query_options.query_timeout; }
int insert_timeout() const { return _query_options.insert_timeout; }
int execution_timeout() const { return _query_options.execution_timeout; }
int max_io_buffers() const { return _query_options.max_io_buffers; }
int num_scanner_threads() const { return _query_options.num_scanner_threads; }

View File

@ -165,8 +165,8 @@ Status VMetaScanner::_fetch_metadata(const TMetaScanRange& meta_scan_range) {
return Status::OK();
}
// _state->query_timeout() is seconds, change to milliseconds
int time_out = _state->query_timeout() * 1000;
// _state->execution_timeout() is seconds, change to milliseconds
int time_out = _state->execution_timeout() * 1000;
TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address;
TFetchSchemaTableDataResult result;
RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(

View File

@ -616,3 +616,29 @@ Translated with www.DeepL.com/Translator (free version)
| 10000000 |
+--------------+
```
***
#### Supplementary instructions on statement execution timeout control
* Means of control
Currently doris supports timeout control through `variable` and `user property` two systems. Both include `qeury_timeout` and `insert_timeout`.
* Priority
The order of priority for timeout to take effect is: `session variable` > `user property` > `global variable` > `default value`
When a variable with a higher priority is not set, the value of the next priority is automatically adopted.
* Related semantics
`query_timeout` is used to control the timeout of all statements, and `insert_timeout` is specifically used to control the timeout of the INSERT statement. When the INSERT statement is executed, the timeout time will take
The maximum value of `query_timeout` and `insert_timeout`.
`query_timeout` and `insert_timeout` in `user property` can only be specified by the ADMIN user for the target user, and its semantics is to change the default timeout time of the specified user, and it does not have `quota` semantics.
* Precautions
The timeout set by `user property` needs to be triggered after the client reconnects.

View File

@ -602,3 +602,30 @@ SELECT /*+ SET_VAR(query_timeout = 1, enable_partition_cache=true) */ sleep(3);
| 10000000 |
+--------------+
```
***
#### 关于语句执行超时控制的补充说明
* 控制手段
目前doris支持通过`variable`和`user property`两种体系来进行超时控制。其中均包含`qeury_timeout`和`insert_timeout`。
* 优先次序
超时生效的优先级次序是:`session variable` > `user property` > `global variable` > `default value`
较高优先级的变量未设置时,会自动采用下一个优先级的数值。
* 相关语义
`query_timeout`用于控制所有语句的超时,`insert_timeout`特定用于控制 INSERT 语句的超时,在执行 INSERT 语句时,超时时间会取
`query_timeout`和`insert_timeout`中的最大值。
`user property`中的`query_timeout`和`insert_timeout`只能由 ADMIN 用户对目标用户予以指定,其语义在于改变被指定用户的默认超时时间,
并且不具备`quota`语义。
* 注意事项
`user property`设置的超时时间需要客户端重连后触发。

View File

@ -305,7 +305,7 @@ public class InsertStmt extends DdlStmt {
db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
// create label and begin transaction
long timeoutSecond = ConnectContext.get().resetExecTimeoutByInsert();
long timeoutSecond = ConnectContext.get().getExecTimeout();
if (Strings.isNullOrEmpty(label)) {
label = "insert_" + DebugUtil.printId(analyzer.getContext().queryId()).replace("-", "_");
}

View File

@ -20,6 +20,8 @@ package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SetVar;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.Config;
@ -30,6 +32,8 @@ import org.apache.doris.common.io.ByteBufferNetworkInputStream;
import org.apache.doris.load.LoadJobRowResult;
import org.apache.doris.mysql.MysqlSerializer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.system.SystemInfoService;
@ -120,8 +124,11 @@ public class MysqlLoadManager {
int oldTimeout = context.getExecTimeout();
int newTimeOut = extractTimeOut(dataDesc);
if (newTimeOut > oldTimeout) {
// set exec timeout avoid by killed TimeoutChecker
context.setExecTimeout(newTimeOut);
// set query timeout avoid by killed TimeoutChecker
SessionVariable sessionVariable = context.getSessionVariable();
sessionVariable.setIsSingleSetVar(true);
VariableMgr.setVar(sessionVariable,
new SetVar(SessionVariable.QUERY_TIMEOUT, new StringLiteral(String.valueOf(newTimeOut))));
}
String token = tokenManager.acquireToken();
LOG.info("execute MySqlLoadJob for id: {}.", loadId);
@ -163,10 +170,6 @@ public class MysqlLoadManager {
}
} finally {
loadContextMap.remove(loadId);
// revert the exec timeout
if (newTimeOut > oldTimeout) {
context.setExecTimeout(oldTimeout);
}
}
return loadResult;
}

View File

@ -79,6 +79,8 @@ public class AcceptListener implements ChannelListener<AcceptingChannel<StreamCo
context.setStartTime();
context.setUserQueryTimeout(
context.getEnv().getAuth().getQueryTimeout(context.getQualifiedUser()));
context.setUserInsertTimeout(
context.getEnv().getAuth().getInsertTimeout(context.getQualifiedUser()));
ConnectProcessor processor = new ConnectProcessor(context);
context.startAcceptQuery(processor);
} catch (AfterConnectedException e) {

View File

@ -898,7 +898,7 @@ public class Auth implements Writable {
}
}
public long getQueryTimeout(String qualifiedUser) {
public int getQueryTimeout(String qualifiedUser) {
readLock();
try {
return propertyMgr.getQueryTimeout(qualifiedUser);
@ -907,6 +907,15 @@ public class Auth implements Writable {
}
}
public int getInsertTimeout(String qualifiedUser) {
readLock();
try {
return propertyMgr.getInsertTimeout(qualifiedUser);
} finally {
readUnlock();
}
}
public long getMaxQueryInstances(String qualifiedUser) {
readLock();
try {

View File

@ -52,7 +52,10 @@ public class CommonUserProperties implements Writable {
private long execMemLimit = -1;
@SerializedName("queryTimeout")
private long queryTimeout = -1;
private int queryTimeout = -1;
@SerializedName("insertTimeout")
private int insertTimeout = -1;
private String[] sqlBlockRulesSplit = {};
@ -114,14 +117,22 @@ public class CommonUserProperties implements Writable {
this.execMemLimit = execMemLimit;
}
public long getQueryTimeout() {
public int getQueryTimeout() {
return queryTimeout;
}
public void setQueryTimeout(long timeout) {
public void setQueryTimeout(int timeout) {
this.queryTimeout = timeout;
}
public int getInsertTimeout() {
return insertTimeout;
}
public void setInsertTimeout(int insertTimeout) {
this.insertTimeout = insertTimeout;
}
public static CommonUserProperties read(DataInput in) throws IOException {
String json = Text.readString(in);
CommonUserProperties commonUserProperties = GsonUtils.GSON.fromJson(json, CommonUserProperties.class);

View File

@ -64,6 +64,8 @@ public class UserProperty implements Writable {
private static final String PROP_CPU_RESOURCE_LIMIT = "cpu_resource_limit";
private static final String PROP_EXEC_MEM_LIMIT = "exec_mem_limit";
private static final String PROP_USER_QUERY_TIMEOUT = "query_timeout";
private static final String PROP_USER_INSERT_TIMEOUT = "insert_timeout";
// advanced properties end
private static final String PROP_LOAD_CLUSTER = "load_cluster";
@ -111,6 +113,7 @@ public class UserProperty implements Writable {
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_RESOURCE_TAGS + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_EXEC_MEM_LIMIT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_QUERY_TIMEOUT + "$", Pattern.CASE_INSENSITIVE));
ADVANCED_PROPERTIES.add(Pattern.compile("^" + PROP_USER_INSERT_TIMEOUT + "$", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_QUOTA + ".", Pattern.CASE_INSENSITIVE));
COMMON_PROPERTIES.add(Pattern.compile("^" + PROP_DEFAULT_LOAD_CLUSTER + "$", Pattern.CASE_INSENSITIVE));
@ -133,10 +136,14 @@ public class UserProperty implements Writable {
return this.commonProperties.getMaxConn();
}
public long getQueryTimeout() {
public int getQueryTimeout() {
return this.commonProperties.getQueryTimeout();
}
public int getInsertTimeout() {
return this.commonProperties.getInsertTimeout();
}
public long getMaxQueryInstances() {
return commonProperties.getMaxQueryInstances(); // maxQueryInstances;
}
@ -170,7 +177,8 @@ public class UserProperty implements Writable {
int cpuResourceLimit = this.commonProperties.getCpuResourceLimit();
Set<Tag> resourceTags = this.commonProperties.getResourceTags();
long execMemLimit = this.commonProperties.getExecMemLimit();
long queryTimeout = this.commonProperties.getQueryTimeout();
int queryTimeout = this.commonProperties.getQueryTimeout();
int insertTimeout = this.commonProperties.getInsertTimeout();
UserResource newResource = resource.getCopiedUserResource();
String newDefaultLoadCluster = defaultLoadCluster;
@ -311,13 +319,22 @@ public class UserProperty implements Writable {
execMemLimit = getLongProperty(key, value, keyArr, PROP_EXEC_MEM_LIMIT);
} else if (keyArr[0].equalsIgnoreCase(PROP_USER_QUERY_TIMEOUT)) {
if (keyArr.length != 1) {
throw new DdlException(PROP_MAX_USER_CONNECTIONS + " format error");
throw new DdlException(PROP_USER_QUERY_TIMEOUT + " format error");
}
try {
queryTimeout = Long.parseLong(value);
queryTimeout = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_USER_QUERY_TIMEOUT + " is not number");
}
} else if (keyArr[0].equalsIgnoreCase(PROP_USER_INSERT_TIMEOUT)) {
if (keyArr.length != 1) {
throw new DdlException(PROP_USER_INSERT_TIMEOUT + " format error");
}
try {
insertTimeout = Integer.parseInt(value);
} catch (NumberFormatException e) {
throw new DdlException(PROP_USER_INSERT_TIMEOUT + " is not number");
}
} else {
throw new DdlException("Unknown user property(" + key + ")");
}
@ -331,6 +348,7 @@ public class UserProperty implements Writable {
this.commonProperties.setResourceTags(resourceTags);
this.commonProperties.setExecMemLimit(execMemLimit);
this.commonProperties.setQueryTimeout(queryTimeout);
this.commonProperties.setInsertTimeout(insertTimeout);
resource = newResource;
if (newDppConfigs.containsKey(newDefaultLoadCluster)) {
defaultLoadCluster = newDefaultLoadCluster;
@ -460,6 +478,9 @@ public class UserProperty implements Writable {
// query timeout
result.add(Lists.newArrayList(PROP_USER_QUERY_TIMEOUT, String.valueOf(commonProperties.getQueryTimeout())));
// insert timeout
result.add(Lists.newArrayList(PROP_USER_INSERT_TIMEOUT, String.valueOf(commonProperties.getInsertTimeout())));
// resource tag
result.add(Lists.newArrayList(PROP_RESOURCE_TAGS, Joiner.on(", ").join(commonProperties.getResourceTags())));

View File

@ -104,7 +104,7 @@ public class UserPropertyMgr implements Writable {
property.update(properties);
}
public long getQueryTimeout(String qualifiedUser) {
public int getQueryTimeout(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
@ -113,6 +113,15 @@ public class UserPropertyMgr implements Writable {
return existProperty.getQueryTimeout();
}
public int getInsertTimeout(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
return 0;
}
return existProperty.getInsertTimeout();
}
public long getMaxConn(String qualifiedUser) {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
@ -201,7 +210,7 @@ public class UserPropertyMgr implements Writable {
UserProperty existProperty = propertyMap.get(qualifiedUser);
existProperty = getLdapPropertyIfNull(qualifiedUser, existProperty);
if (existProperty == null) {
return new String[]{};
return new String[] {};
}
return existProperty.getSqlBlockRules();
}

View File

@ -155,21 +155,18 @@ public class ConnectContext {
// This context is used for SSL connection between server and mysql client.
private final MysqlSslContext mysqlSslContext = new MysqlSslContext(SSL_PROTOCOL);
private long userQueryTimeout;
/**
* the global execution timeout in seconds, currently set according to query_timeout and insert_timeout.
* <p>
* when a connection is established, exec_timeout is set by query_timeout, when the statement is an insert stmt,
* then it is set to max(executionTimeoutS, insert_timeout) using {@link #setExecTimeout(int timeout)} at
* {@link StmtExecutor}.
*/
private int executionTimeoutS;
private StatsErrorEstimator statsErrorEstimator;
public void setUserQueryTimeout(long queryTimeout) {
this.userQueryTimeout = queryTimeout;
public void setUserQueryTimeout(int queryTimeout) {
if (queryTimeout > 0) {
sessionVariable.setQueryTimeoutS(queryTimeout);
}
}
public void setUserInsertTimeout(int insertTimeout) {
if (insertTimeout > 0) {
sessionVariable.setInsertTimeoutS(insertTimeout);
}
}
private StatementContext statementContext;
@ -239,8 +236,6 @@ public class ConnectContext {
if (Config.use_fuzzy_session_variable) {
sessionVariable.initFuzzyModeVariables();
}
// initialize executionTimeoutS to default to queryTimeout
executionTimeoutS = sessionVariable.getQueryTimeoutS();
}
public boolean isTxnModel() {
@ -596,20 +591,13 @@ public class ConnectContext {
killConnection = true;
}
} else {
long timeout;
String timeoutTag = "query";
if (userQueryTimeout > 0) {
// user set query_timeout property
timeout = userQueryTimeout * 1000L;
} else {
//to ms
timeout = executionTimeoutS * 1000L;
}
//deal with insert stmt particularly
// insert stmt particularly
if (executor != null && executor.isInsertStmt()) {
timeoutTag = "insert";
}
//to ms
long timeout = getExecTimeout() * 1000L;
if (delta > timeout) {
LOG.warn("kill {} timeout, remote: {}, query timeout: {}",
timeoutTag, getMysqlChannel().getRemoteHostPortString(), timeout);
@ -652,17 +640,20 @@ public class ConnectContext {
return currentConnectedFEIp;
}
public void setExecTimeout(int timeout) {
executionTimeoutS = timeout;
}
public long resetExecTimeoutByInsert() {
executionTimeoutS = Math.max(executionTimeoutS, sessionVariable.getInsertTimeoutS());
return executionTimeoutS;
}
/**
* We calculate and get the exact execution timeout here, rather than setting
* execution timeout in many other places.
*
* @return exact execution timeout
*/
public int getExecTimeout() {
return executionTimeoutS;
if (executor != null && executor.isInsertStmt()) {
// particular for insert stmt, we can expand other type of timeout in the same way
return Math.max(sessionVariable.getInsertTimeoutS(), sessionVariable.getQueryTimeoutS());
} else {
// normal query stmt
return sessionVariable.getQueryTimeoutS();
}
}
public class ThreadInfo {

View File

@ -1502,8 +1502,8 @@ public class StmtExecutor implements ProfileWriter {
InterruptedException, ExecutionException, TimeoutException {
TransactionEntry txnEntry = context.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
long timeoutSecond = ConnectContext.get().getExecTimeout();
SessionVariable sessionVariable = context.getSessionVariable();
long timeoutSecond = context.getExecTimeout();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
Database dbObj = Env.getCurrentInternalCatalog()
@ -1572,9 +1572,6 @@ public class StmtExecutor implements ProfileWriter {
}
analyzeVariablesInStmt(insertStmt.getQueryStmt());
// reset the executionTimeout since query hint maybe change the insert_timeout again
context.resetExecTimeoutByInsert();
long createTime = System.currentTimeMillis();
Throwable throwable = null;
long txnId = -1;

View File

@ -279,7 +279,7 @@ public class ResourceTagQueryTest {
Assert.assertEquals(1000000, execMemLimit);
List<List<String>> userProps = Env.getCurrentEnv().getAuth().getUserProperties(Auth.ROOT_USER);
Assert.assertEquals(17, userProps.size());
Assert.assertEquals(18, userProps.size());
}
private void checkTableReplicaAllocation(OlapTable tbl) throws InterruptedException {

View File

@ -139,21 +139,20 @@ public class ConnectContextTest {
// sleep no time out
ctx.setStartTime();
Assert.assertFalse(ctx.isKilled());
long now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000 - 1;
long now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000L - 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
// Timeout
ctx.setStartTime();
now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000 + 1;
now = ctx.getStartTime() + ctx.getSessionVariable().getWaitTimeoutS() * 1000L + 1;
ctx.setExecutor(executor);
ctx.checkTimeout(now);
Assert.assertTrue(ctx.isKilled());
// user query timeout
ctx.setStartTime();
ctx.setUserQueryTimeout(1);
now = ctx.getStartTime() + auth.getQueryTimeout(qualifiedUser) * 1000 + 1;
now = ctx.getStartTime() + auth.getQueryTimeout(qualifiedUser) * 1000L + 1;
ctx.setExecutor(executor);
ctx.checkTimeout(now);
Assert.assertTrue(ctx.isKilled());
@ -176,14 +175,12 @@ public class ConnectContextTest {
// sleep no time out
Assert.assertFalse(ctx.isKilled());
ctx.setExecutor(executor);
ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
long now = ctx.getExecTimeout() * 1000L - 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());
// Timeout
ctx.setExecutor(executor);
ctx.setExecTimeout(ctx.getSessionVariable().getInsertTimeoutS());
now = ctx.getExecTimeout() * 1000L + 1;
ctx.checkTimeout(now);
Assert.assertFalse(ctx.isKilled());

View File

@ -55,7 +55,7 @@ struct TScanOpenParams {
// max keep alive time min
11: optional i16 keep_alive_min
12: optional i32 query_timeout
12: optional i32 execution_timeout
// memory limit for a single query
13: optional i64 mem_limit