fix querytimeout bug
This commit is contained in:
@ -1791,15 +1791,26 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
|
|||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
int64_t query_timeout = 0;
|
int64_t query_timeout = 0;
|
||||||
ObSQLSessionInfo *session = nullptr;
|
if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) {
|
||||||
if (OB_ISNULL(session = ctx.get_my_session())) {
|
LOG_WARN("fail to get value", K(ret));
|
||||||
ret = OB_ERR_UNEXPECTED;
|
} else if (query_timeout < 0) {
|
||||||
LOG_WARN("session is null", KR(ret));
|
ret = OB_TIMEOUT;
|
||||||
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
|
LOG_WARN("session is timeout", K(ret));
|
||||||
LOG_WARN("fail to get query timeout", KR(ret));
|
} else if (0 == query_timeout) {
|
||||||
|
ObSQLSessionInfo *session = nullptr;
|
||||||
|
if (OB_ISNULL(session = ctx.get_my_session())) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("session is null", KR(ret));
|
||||||
|
} else if (OB_FAIL(session->get_query_timeout(query_timeout))) {
|
||||||
|
LOG_WARN("fail to get query timeout", KR(ret));
|
||||||
|
} else if (query_timeout <= 0) {
|
||||||
|
ret = OB_TIMEOUT;
|
||||||
|
LOG_WARN("session is timeout", K(ret));
|
||||||
|
} else {
|
||||||
|
THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
query_timeout = MAX(query_timeout, RPC_BATCH_INSERT_TIMEOUT_US);
|
THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout);
|
||||||
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + query_timeout);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2816,6 +2816,22 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm
|
|||||||
LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count));
|
LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
int64_t query_timeout = 0;
|
||||||
|
if (OB_FAIL(hint.get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) {
|
||||||
|
LOG_WARN("fail to get value", K(ret));
|
||||||
|
} else if (0 == query_timeout) {
|
||||||
|
if (OB_FAIL(ctx.get_my_session()->get_query_timeout(query_timeout))) {
|
||||||
|
LOG_WARN("fail to get query timeout", KR(ret));
|
||||||
|
} else {
|
||||||
|
query_timeout = MAX(query_timeout, RPC_BATCH_INSERT_TIMEOUT_US);
|
||||||
|
THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout);
|
||||||
|
}
|
||||||
|
} else if (query_timeout > 0) {
|
||||||
|
THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
if (OB_FAIL(parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) {
|
if (OB_FAIL(parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) {
|
||||||
LOG_WARN("fail to init parser", K(ret));
|
LOG_WARN("fail to init parser", K(ret));
|
||||||
|
|||||||
@ -348,16 +348,12 @@ int ObKMPStateMachine::init(ObIAllocator &allocator, const ObString &str)
|
|||||||
int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t reserved_us) {
|
int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t reserved_us) {
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
bool is_timeout = false;
|
bool is_timeout = false;
|
||||||
int64_t query_timeout = 0;
|
int64_t worker_query_timeout = THIS_WORKER.get_timeout_ts();
|
||||||
int64_t query_start_time = session.get_query_start_time();
|
|
||||||
int64_t current_time = ObTimeUtil::current_time();
|
int64_t current_time = ObTimeUtil::current_time();
|
||||||
|
|
||||||
if (OB_FAIL(session.get_query_timeout(query_timeout))) {
|
if (OB_FAIL(session.is_timeout(is_timeout))) {
|
||||||
LOG_WARN("fail to get query timeout", K(ret));
|
|
||||||
} else if (OB_FAIL(session.is_timeout(is_timeout))) {
|
|
||||||
LOG_WARN("get session timeout info failed", K(ret));
|
LOG_WARN("get session timeout info failed", K(ret));
|
||||||
} else if (OB_UNLIKELY(query_start_time + query_timeout
|
} else if (OB_UNLIKELY(worker_query_timeout < current_time + reserved_us)) {
|
||||||
< current_time + reserved_us)) {
|
|
||||||
ret = OB_TIMEOUT;
|
ret = OB_TIMEOUT;
|
||||||
LOG_WARN("query is timeout", K(ret));
|
LOG_WARN("query is timeout", K(ret));
|
||||||
} else if (OB_UNLIKELY(is_timeout)) {
|
} else if (OB_UNLIKELY(is_timeout)) {
|
||||||
@ -367,7 +363,7 @@ int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t res
|
|||||||
LOG_WARN("session's state is not OB_SUCCESS", K(ret));
|
LOG_WARN("session's state is not OB_SUCCESS", K(ret));
|
||||||
}
|
}
|
||||||
if (OB_FAIL(ret)) {
|
if (OB_FAIL(ret)) {
|
||||||
LOG_WARN("LOAD DATA timeout", K(ret), K(session.get_sessid()), K(query_timeout), K(query_start_time), K(current_time));
|
LOG_WARN("LOAD DATA timeout", K(ret), K(session.get_sessid()), K(worker_query_timeout), K(current_time), K(reserved_us));
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user