From e38f8445ccc0dfed50a66247f510f2d420015655 Mon Sep 17 00:00:00 2001 From: suz-yang Date: Mon, 29 Jan 2024 14:11:44 +0000 Subject: [PATCH] Fix direct load use dirty phy_plan --- .../engine/cmd/ob_load_data_direct_impl.cpp | 31 ++++++++++++------- src/sql/engine/cmd/ob_load_data_direct_impl.h | 1 + 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp index f7bd876b7c..81ffe18e7b 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.cpp +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.cpp @@ -1720,25 +1720,31 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) ctx_ = &ctx; load_stmt_ = &load_stmt; const ObLoadArgument &load_args = load_stmt_->get_load_arguments(); + ObSQLSessionInfo *session = nullptr; int64_t total_line_count = 0; + if (OB_ISNULL(session = ctx.get_my_session()) || OB_ISNULL(ctx.get_stmt_factory()) || + OB_ISNULL(ctx.get_stmt_factory()->get_query_ctx())) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("ctx is unexpected", KR(ret), K(ctx)); + } else if (OB_FAIL(plan_.set_vars(ctx.get_stmt_factory()->get_query_ctx()->variables_))) { + LOG_WARN("fail to set vars", KR(ret)); + } else if (OB_FAIL(session->set_cur_phy_plan(&plan_))) { + LOG_WARN("fail to set cur phy plan", KR(ret)); + } else if (FALSE_IT(ctx.reference_my_plan(&plan_))) { + } else if (OB_FAIL(ctx.init_phy_op(1))) { + LOG_WARN("fail to init phy op", KR(ret)); + } + if (OB_SUCC(ret)) { - ObSQLSessionInfo *session = nullptr; int64_t query_timeout = 0; - if (OB_ISNULL(session = ctx.get_my_session())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is null", KR(ret)); - } else if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) { + if (OB_FAIL(load_stmt_->get_hints().get_value(ObLoadDataHint::QUERY_TIMEOUT, query_timeout))) { LOG_WARN("fail to get value", K(ret)); } else if (query_timeout < 0) { ret = OB_TIMEOUT; LOG_WARN("session is timeout", K(ret)); } else if (0 == query_timeout) { - ObSQLSessionInfo *session = nullptr; - if (OB_ISNULL(session = ctx.get_my_session())) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("session is null", KR(ret)); - } else if (OB_FAIL(session->get_query_timeout(query_timeout))) { + if (OB_FAIL(session->get_query_timeout(query_timeout))) { LOG_WARN("fail to get query timeout", KR(ret)); } else if (query_timeout <= 0) { ret = OB_TIMEOUT; @@ -1746,7 +1752,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) } } if (OB_SUCC(ret)) { - const int64_t timeout_ts = ctx.get_my_session()->get_query_start_time() + query_timeout; + const int64_t timeout_ts = session->get_query_start_time() + query_timeout; ctx.get_physical_plan_ctx()->set_timeout_timestamp(timeout_ts); THIS_WORKER.set_timeout_ts(timeout_ts); } @@ -1829,6 +1835,9 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt) } direct_loader_.destroy(); + if (OB_NOT_NULL(session)) { + session->reset_cur_phy_plan_to_null(); + } return ret; } diff --git a/src/sql/engine/cmd/ob_load_data_direct_impl.h b/src/sql/engine/cmd/ob_load_data_direct_impl.h index 7c6d6f4e86..b91dedaf58 100644 --- a/src/sql/engine/cmd/ob_load_data_direct_impl.h +++ b/src/sql/engine/cmd/ob_load_data_direct_impl.h @@ -434,6 +434,7 @@ private: private: ObExecContext *ctx_; ObLoadDataStmt *load_stmt_; + ObPhysicalPlan plan_; LoadExecuteParam execute_param_; LoadExecuteContext execute_ctx_; observer::ObTableLoadInstance direct_loader_;