diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index 504b9d6454..3ccb767a9d 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1791,15 +1791,26 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) if (OB_SUCC(ret)) { int64_t query_timeout = 0; - ObSQLSessionInfo *session = nullptr; - if (OB_ISNULL(session = ctx.get_my_session())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is null", KR(ret)); - } else if (OB_FAIL(session->get_query_timeout(query_timeout))) { - LOG_WARN("fail to get query timeout", KR(ret)); + if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) { + LOG_WARN("fail to get value", K(ret)); + } else if (query_timeout < 0) { + ret = OB_TIMEOUT; + LOG_WARN("session is timeout", K(ret)); + } else if (0 == query_timeout) { + ObSQLSessionInfo *session = nullptr; + if (OB_ISNULL(session = ctx.get_my_session())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("session is null", KR(ret)); + } else if (OB_FAIL(session->get_query_timeout(query_timeout))) { + LOG_WARN("fail to get query timeout", KR(ret)); + } else if (query_timeout <= 0) { + ret = OB_TIMEOUT; + LOG_WARN("session is timeout", K(ret)); + } else { + THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout); + } } else { - query_timeout = MAX(query_timeout, RPC_BATCH_INSERT_TIMEOUT_US); - THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + query_timeout); + THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout); } } diff --git a/src/sql/engine/cmd/ob_load_data_impl.cpp b/src/sql/engine/cmd/ob_load_data_impl.cpp index b92ef29b5e..3151e39d03 100644 --- a/src/sql/engine/cmd/ob_load_data_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_impl.cpp @@ -2816,6 +2816,22 @@ int ObLoadDataSPImpl::ToolBox::init(ObExecContext &ctx, ObLoadDataStmt &load_stm LOG_DEBUG("batch size", K(hint_batch_size), K(batch_row_count)); } + if (OB_SUCC(ret)) { + int64_t query_timeout = 0; + if (OB_FAIL(hint.get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) { + LOG_WARN("fail to get value", K(ret)); + } else if (0 == query_timeout) { + if (OB_FAIL(ctx.get_my_session()->get_query_timeout(query_timeout))) { + LOG_WARN("fail to get query timeout", KR(ret)); + } else { + query_timeout = MAX(query_timeout, RPC_BATCH_INSERT_TIMEOUT_US); + THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout); + } + } else if (query_timeout > 0) { + THIS_WORKER.set_timeout_ts(ctx.get_my_session()->get_query_start_time() + query_timeout); + } + } + if (OB_SUCC(ret)) { if (OB_FAIL(parser.init(file_formats, num_of_file_column, load_args.file_cs_type_))) { LOG_WARN("fail to init parser", K(ret)); diff --git a/src/sql/engine/cmd/ob_load_data_utils.cpp b/src/sql/engine/cmd/ob_load_data_utils.cpp index cd0cfb2ef4..b6027fe172 100644 --- a/src/sql/engine/cmd/ob_load_data_utils.cpp +++ b/src/sql/engine/cmd/ob_load_data_utils.cpp @@ -348,16 +348,12 @@ int ObKMPStateMachine::init(ObIAllocator &allocator, const ObString &str) int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t reserved_us) { int ret = OB_SUCCESS; bool is_timeout = false; - int64_t query_timeout = 0; - int64_t query_start_time = session.get_query_start_time(); + int64_t worker_query_timeout = THIS_WORKER.get_timeout_ts(); int64_t current_time = ObTimeUtil::current_time(); - if (OB_FAIL(session.get_query_timeout(query_timeout))) { - LOG_WARN("fail to get query timeout", K(ret)); - } else if (OB_FAIL(session.is_timeout(is_timeout))) { + if (OB_FAIL(session.is_timeout(is_timeout))) { LOG_WARN("get session timeout info failed", K(ret)); - } else if (OB_UNLIKELY(query_start_time + query_timeout - < current_time + reserved_us)) { + } else if (OB_UNLIKELY(worker_query_timeout < current_time + reserved_us)) { ret = OB_TIMEOUT; LOG_WARN("query is timeout", K(ret)); } else if (OB_UNLIKELY(is_timeout)) { @@ -367,7 +363,7 @@ int ObLoadDataUtils::check_session_status(ObSQLSessionInfo &session, int64_t res LOG_WARN("session's state is not OB_SUCCESS", K(ret)); } if (OB_FAIL(ret)) { - LOG_WARN("LOAD DATA timeout", K(ret), K(session.get_sessid()), K(query_timeout), K(query_start_time), K(current_time)); + LOG_WARN("LOAD DATA timeout", K(ret), K(session.get_sessid()), K(worker_query_timeout), K(current_time), K(reserved_us)); } return ret; }