diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index a0d76b14b..fc16156b4 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -537,10 +537,10 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ arg.load_mode_ = param_.load_mode_; arg.compressor_type_ = param_.compressor_type_; arg.online_sample_percent_ = param_.online_sample_percent_; - const ObExecContext *exec_ctx = arg.session_info_->get_cur_exec_ctx(); - if (exec_ctx == nullptr) { - //do nothing - } else if (OB_FAIL(arg.set_exec_ctx_serialized_str(*exec_ctx))) { + if (ctx_->exec_ctx_ == nullptr) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("exec ctx must not be nullptr", KR(ret)); + } else if (OB_FAIL(arg.set_exec_ctx_serialized_str(*(ctx_->exec_ctx_)))) { LOG_WARN("fail to set exec ctx", KR(ret)); } diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 63eb51b1c..91257dc51 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -500,7 +500,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, LOG_WARN("unexpected table ctx is not null", KR(ret)); } else if (OB_FAIL(ObTableLoadService::alloc_ctx(table_ctx))) { LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); - } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string("")))) { + } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string(""), execute_ctx_->exec_ctx_))) { LOG_WARN("fail to init table ctx", KR(ret)); } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) { LOG_WARN("fail to coordinator init ctx", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 89b1161c9..c83bc7d86 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -45,7 +45,8 @@ ObTableLoadTableCtx::ObTableLoadTableCtx() is_assigned_resource_(false), is_assigned_memory_(false), mark_delete_(false), - is_inited_(false) + is_inited_(false), + des_exec_ctx_(nullptr) { free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID; allocator_.set_tenant_id(MTL_ID()); @@ -56,29 +57,31 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx() destroy(); } -int ObTableLoadTableCtx::new_exec_ctx(const ObString &exec_ctx_serialized_str) +int ObTableLoadTableCtx::new_exec_ctx(const ObString &des_exec_ctx_serialized_str) { int ret = OB_SUCCESS; - if (!exec_ctx_serialized_str.empty()) { + if (!des_exec_ctx_serialized_str.empty()) { ObString tmp_str; - exec_ctx_ = OB_NEWx(ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_); + des_exec_ctx_ = OB_NEWx(ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_); int64_t pos = 0; - if (exec_ctx_ == nullptr) { + if (des_exec_ctx_ == nullptr) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to deserialize exe ctx", KR(ret)); - } else if (OB_FAIL(ob_write_string(allocator_, exec_ctx_serialized_str, tmp_str))) { + } else if (OB_FAIL(ob_write_string(allocator_, des_exec_ctx_serialized_str, tmp_str))) { LOG_WARN("fail to copy string", KR(ret)); - } else if (OB_FAIL(exec_ctx_->deserialize(tmp_str.ptr(), tmp_str.length(), pos))) { + } else if (OB_FAIL(des_exec_ctx_->deserialize(tmp_str.ptr(), tmp_str.length(), pos))) { LOG_WARN("fail to deserialize exec ctx", KR(ret)); } } return ret; } -int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, - sql::ObSQLSessionInfo *session_info, - const common::ObString &exec_ctx_serialized_str) +int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, + const ObTableLoadDDLParam &ddl_param, + sql::ObSQLSessionInfo *session_info, + const common::ObString &des_exec_ctx_serialized_str, + ObExecContext *exec_ctx) { int ret = OB_SUCCESS; if (IS_INIT) { @@ -87,6 +90,10 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD } else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param)); + } else if ((des_exec_ctx_serialized_str.empty() && exec_ctx == nullptr) + || (!des_exec_ctx_serialized_str.empty() && exec_ctx != nullptr)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid args", KR(ret), K(des_exec_ctx_serialized_str.empty()), KP(exec_ctx)); } else { param_ = param; ddl_param_ = ddl_param; @@ -103,16 +110,18 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD LOG_WARN("fail to create session info", KR(ret)); } else if (OB_FAIL(ObTableLoadUtils::deep_copy(*session_info, *session_info_, allocator_))) { LOG_WARN("fail to deep copy", KR(ret)); - } else if (OB_FAIL(new_exec_ctx(exec_ctx_serialized_str))) { + } else if (OB_FAIL(new_exec_ctx(des_exec_ctx_serialized_str))) { LOG_WARN("fail to new exec ctx", KR(ret)); - } else { - is_inited_ = true; } } + if (OB_SUCC(ret)) { - if (exec_ctx_ != nullptr && session_info_ != nullptr) { - ObSQLSessionInfo::ExecCtxSessionRegister(*session_info_, *exec_ctx_); + if (exec_ctx != nullptr) { + exec_ctx_ = exec_ctx; + } else { + exec_ctx_ = des_exec_ctx_; } + is_inited_ = true; } return ret; @@ -281,10 +290,10 @@ void ObTableLoadTableCtx::destroy() allocator_.free(store_ctx_); store_ctx_ = nullptr; } - if (nullptr != exec_ctx_) { - exec_ctx_->~ObDesExecContext(); - allocator_.free(exec_ctx_); - exec_ctx_ = nullptr; + if (nullptr != des_exec_ctx_) { + des_exec_ctx_->~ObDesExecContext(); + allocator_.free(des_exec_ctx_); + des_exec_ctx_ = nullptr; } if (nullptr != session_info_) { observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_); diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index 754eca330..ed262a52e 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -38,7 +38,11 @@ class ObTableLoadTableCtx : public common::ObDLinkBase public: ObTableLoadTableCtx(); ~ObTableLoadTableCtx(); - int init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info, const common::ObString &exec_ctx_serialized_str); + int init(const ObTableLoadParam ¶m, + const ObTableLoadDDLParam &ddl_param, + sql::ObSQLSessionInfo *session_info, + const common::ObString &exec_ctx_serialized_str, + sql::ObExecContext *exec_ctx = nullptr); void stop(); void destroy(); bool is_valid() const { return is_inited_; } @@ -89,7 +93,7 @@ public: sql::ObLoadDataGID gid_; sql::ObLoadDataStat *job_stat_; sql::ObSQLSessionInfo *session_info_; - sql::ObDesExecContext *exec_ctx_; + sql::ObExecContext *exec_ctx_; sql::ObFreeSessionCtx free_session_ctx_; private: // 只在初始化的时候使用, 线程不安全 @@ -102,6 +106,7 @@ private: bool is_assigned_memory_; bool mark_delete_; bool is_inited_; + sql::ObDesExecContext *des_exec_ctx_; DISALLOW_COPY_AND_ASSIGN(ObTableLoadTableCtx); };