From 48090094a2855fdf948930b4fb9b957e66125e99 Mon Sep 17 00:00:00 2001 From: coolfishchen Date: Tue, 3 Sep 2024 05:20:37 +0000 Subject: [PATCH] serialize exec context for direct load --- .../ob_table_load_control_rpc_executor.cpp | 3 +- .../ob_table_load_control_rpc_struct.cpp | 41 ++++++++++++++++++- .../ob_table_load_control_rpc_struct.h | 9 ++++ .../table_load/ob_table_load_coordinator.cpp | 5 +++ .../table_load/ob_table_load_instance.cpp | 2 +- .../table_load/ob_table_load_table_ctx.cpp | 39 ++++++++++++++++-- .../table_load/ob_table_load_table_ctx.h | 4 +- .../table_load/ob_table_load_utils.cpp | 27 ++++++++++++ src/observer/table_load/ob_table_load_utils.h | 2 + 9 files changed, 125 insertions(+), 7 deletions(-) diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp index a4b2a0583..d73fdb6ca 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_executor.cpp @@ -16,6 +16,7 @@ #include "observer/table_load/ob_table_load_service.h" #include "observer/table_load/ob_table_load_store.h" #include "observer/table_load/ob_table_load_table_ctx.h" +#include "sql/engine/ob_des_exec_context.h" namespace oceanbase { @@ -99,7 +100,7 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); - } else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) { + } else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_, arg_.des_exec_ctx_))) { LOG_WARN("fail to init table ctx", KR(ret)); } else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx, arg_.partition_id_array_, arg_.target_partition_id_array_))) { diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp index 7d4d0484a..ee4bdb873 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.cpp @@ -14,6 +14,7 @@ #include "ob_table_load_control_rpc_struct.h" #include "observer/table_load/ob_table_load_utils.h" +#include "sql/engine/ob_des_exec_context.h" namespace oceanbase { @@ -69,7 +70,10 @@ ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg() insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE), load_mode_(ObDirectLoadMode::INVALID_MODE), compressor_type_(ObCompressorType::INVALID_COMPRESSOR), - online_sample_percent_(1.) + online_sample_percent_(1.), + exec_ctx_(nullptr), + allocator_("TLD_pre_begin"), + des_exec_ctx_(nullptr) { free_session_ctx_.sessid_ = ObSQLSessionInfo::INVALID_SESSID; } @@ -82,6 +86,12 @@ ObDirectLoadControlPreBeginArg::~ObDirectLoadControlPreBeginArg() } session_info_ = nullptr; } + + if (nullptr != des_exec_ctx_) { + des_exec_ctx_->~ObDesExecContext(); + allocator_.free(des_exec_ctx_); + des_exec_ctx_ = nullptr; + } } OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg) @@ -114,6 +124,15 @@ OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg) load_mode_, compressor_type_, online_sample_percent_); + if (OB_SUCC(ret)) { + if (OB_ISNULL(exec_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("exec ctx is null", KR(ret)); + } else { + OB_UNIS_ENCODE(*exec_ctx_); + } + } + return ret; } @@ -146,6 +165,16 @@ OB_DEF_DESERIALIZE(ObDirectLoadControlPreBeginArg) load_mode_, compressor_type_, online_sample_percent_); + if (OB_SUCC(ret)) { + des_exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_); + + if (des_exec_ctx_ == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate des_exec_ctx", KR(ret)); + } else { + LST_DO_CODE(OB_UNIS_DECODE, *des_exec_ctx_); + } + } return ret; } @@ -180,6 +209,16 @@ OB_DEF_SERIALIZE_SIZE(ObDirectLoadControlPreBeginArg) load_mode_, compressor_type_, online_sample_percent_); + + if (OB_SUCC(ret)) { + if (OB_ISNULL(exec_ctx_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("exec ctx is null", KR(ret)); + } else { + OB_UNIS_ADD_LEN(*exec_ctx_); + } + } + return len; } diff --git a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h index 515d64b8c..8d49ae426 100644 --- a/src/observer/table_load/control/ob_table_load_control_rpc_struct.h +++ b/src/observer/table_load/control/ob_table_load_control_rpc_struct.h @@ -26,6 +26,12 @@ namespace oceanbase { +namespace sql +{ + class ObExecContext; + class ObDesExecContext; +} + namespace observer { enum class ObDirectLoadControlCommandType @@ -196,6 +202,9 @@ public: storage::ObDirectLoadMode::Type load_mode_; ObCompressorType compressor_type_; double online_sample_percent_; + sql::ObExecContext *exec_ctx_; + common::ObArenaAllocator allocator_; + sql::ObDesExecContext *des_exec_ctx_; }; class ObDirectLoadControlConfirmBeginArg final diff --git a/src/observer/table_load/ob_table_load_coordinator.cpp b/src/observer/table_load/ob_table_load_coordinator.cpp index d547a9faa..27a5307ec 100644 --- a/src/observer/table_load/ob_table_load_coordinator.cpp +++ b/src/observer/table_load/ob_table_load_coordinator.cpp @@ -458,6 +458,11 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_ arg.load_mode_ = param_.load_mode_; arg.compressor_type_ = param_.compressor_type_; arg.online_sample_percent_ = param_.online_sample_percent_; + arg.exec_ctx_ = arg.session_info_->get_cur_exec_ctx(); + if (arg.exec_ctx_ == nullptr) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("exec ctx is null", KR(ret)); + } for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) { const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i); const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info = diff --git a/src/observer/table_load/ob_table_load_instance.cpp b/src/observer/table_load/ob_table_load_instance.cpp index 506c9b0fa..1ba96aa2f 100644 --- a/src/observer/table_load/ob_table_load_instance.cpp +++ b/src/observer/table_load/ob_table_load_instance.cpp @@ -499,7 +499,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m, } else if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("fail to alloc table ctx", KR(ret), K(param)); - } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info))) { + } else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, execute_ctx_->exec_ctx_))) { LOG_WARN("fail to init table ctx", KR(ret)); } else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) { LOG_WARN("fail to coordinator init ctx", KR(ret)); diff --git a/src/observer/table_load/ob_table_load_table_ctx.cpp b/src/observer/table_load/ob_table_load_table_ctx.cpp index 3e677ea04..1511193d2 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.cpp +++ b/src/observer/table_load/ob_table_load_table_ctx.cpp @@ -21,6 +21,8 @@ #include "observer/table_load/ob_table_load_trans_ctx.h" #include "observer/table_load/ob_table_load_utils.h" #include "share/ob_common_rpc_proxy.h" +#include "sql/session/ob_sql_session_info.h" +#include "sql/engine/ob_des_exec_context.h" namespace oceanbase { @@ -36,6 +38,7 @@ ObTableLoadTableCtx::ObTableLoadTableCtx() store_ctx_(nullptr), job_stat_(nullptr), session_info_(nullptr), + exec_ctx_(nullptr), allocator_("TLD_TableCtx"), ref_count_(0), is_assigned_resource_(false), @@ -53,16 +56,28 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx() destroy(); } +int ObTableLoadTableCtx::new_exec_ctx() +{ + int ret = OB_SUCCESS; + exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_); + if (exec_ctx_ == nullptr) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate des_exec_ctx", KR(ret)); + } + return ret; +} + int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, - sql::ObSQLSessionInfo *session_info) + sql::ObSQLSessionInfo *session_info, + sql::ObExecContext *ctx) { int ret = OB_SUCCESS; if (IS_INIT) { ret = OB_INIT_TWICE; LOG_WARN("ObTableLoadTableCtx init twice", KR(ret)); - } else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) { + } else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info || ctx == nullptr)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param)); + LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param), KP(ctx)); } else { param_ = param; ddl_param_ = ddl_param; @@ -79,10 +94,23 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD LOG_WARN("fail to create session info", KR(ret)); } else if (OB_FAIL(ObTableLoadUtils::deep_copy(*session_info, *session_info_, allocator_))) { LOG_WARN("fail to deep copy", KR(ret)); + } else if (OB_FAIL(new_exec_ctx())) { + LOG_WARN("fail to new exec ctx", KR(ret)); + } else if (OB_FAIL(ObTableLoadUtils::deep_copy(*ctx, *exec_ctx_, allocator_))) { + LOG_WARN("fail to deep copy", KR(ret)); } else { is_inited_ = true; } } + if (OB_SUCC(ret)) { + if (exec_ctx_ == nullptr || session_info_ == nullptr) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("cur exec ctx is null", KR(ret), KP(session_info_), KP(exec_ctx_)); + } else { + ObSQLSessionInfo::ExecCtxSessionRegister(*session_info_, *exec_ctx_); + } + } + return ret; } @@ -253,6 +281,11 @@ void ObTableLoadTableCtx::destroy() observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_); session_info_ = nullptr; } + if (nullptr != exec_ctx_) { + exec_ctx_->~ObDesExecContext(); + allocator_.free(exec_ctx_); + exec_ctx_ = nullptr; + } unregister_job_stat(); is_inited_ = false; } diff --git a/src/observer/table_load/ob_table_load_table_ctx.h b/src/observer/table_load/ob_table_load_table_ctx.h index a79d8fe07..fd4380d36 100644 --- a/src/observer/table_load/ob_table_load_table_ctx.h +++ b/src/observer/table_load/ob_table_load_table_ctx.h @@ -38,7 +38,7 @@ class ObTableLoadTableCtx : public common::ObDLinkBase public: ObTableLoadTableCtx(); ~ObTableLoadTableCtx(); - int init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info); + int init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info, sql::ObExecContext *ctx); void stop(); void destroy(); bool is_valid() const { return is_inited_; } @@ -79,6 +79,7 @@ public: private: int register_job_stat(); void unregister_job_stat(); + int new_exec_ctx(); public: ObTableLoadParam param_; ObTableLoadDDLParam ddl_param_; @@ -88,6 +89,7 @@ public: sql::ObLoadDataGID gid_; sql::ObLoadDataStat *job_stat_; sql::ObSQLSessionInfo *session_info_; + sql::ObDesExecContext *exec_ctx_; sql::ObFreeSessionCtx free_session_ctx_; private: // 只在初始化的时候使用, 线程不安全 diff --git a/src/observer/table_load/ob_table_load_utils.cpp b/src/observer/table_load/ob_table_load_utils.cpp index 24879456e..926065beb 100644 --- a/src/observer/table_load/ob_table_load_utils.cpp +++ b/src/observer/table_load/ob_table_load_utils.cpp @@ -19,6 +19,7 @@ #include "sql/session/ob_sql_session_info.h" #include "storage/blocksstable/ob_datum_range.h" #include "storage/blocksstable/ob_datum_row.h" +#include "sql/engine/ob_des_exec_context.h" namespace oceanbase { @@ -189,6 +190,32 @@ int ObTableLoadUtils::deep_copy(const ObDatumRange &src, ObDatumRange &dest, ObI return ret; } +int ObTableLoadUtils::deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, ObIAllocator &allocator) +{ + int ret = OB_SUCCESS; + char *buf = nullptr; + int64_t buf_size = src.get_serialize_size(); + int64_t data_len = 0; + int64_t pos = 0; + if (OB_ISNULL(buf = static_cast(allocator.alloc(buf_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size)); + } else if (OB_FAIL(src.serialize(buf, buf_size, pos))) { + LOG_WARN("serialize session info failed", KR(ret)); + } else { + data_len = pos; + pos = 0; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(dest.deserialize(buf, data_len, pos))) { + LOG_WARN("deserialize session info failed", KR(ret)); + } + } + return ret; +} + + + int ObTableLoadUtils::deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, ObIAllocator &allocator) { int ret = OB_SUCCESS; diff --git a/src/observer/table_load/ob_table_load_utils.h b/src/observer/table_load/ob_table_load_utils.h index 72a4b5a6a..d0227838f 100644 --- a/src/observer/table_load/ob_table_load_utils.h +++ b/src/observer/table_load/ob_table_load_utils.h @@ -41,6 +41,7 @@ namespace table namespace sql { class ObSQLSessionInfo; +class ObDesExecContext; } // namespace sql namespace observer { @@ -60,6 +61,7 @@ public: static int deep_copy(const blocksstable::ObDatumRowkey &src, blocksstable::ObDatumRowkey &dest, common::ObIAllocator &allocator); static int deep_copy(const blocksstable::ObDatumRange &src, blocksstable::ObDatumRange &dest, common::ObIAllocator &allocator); static int deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, common::ObIAllocator &allocator); + static int deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, common::ObIAllocator &allocator); template static int deep_copy(const table::ObTableLoadArray &src, table::ObTableLoadArray &dest, common::ObIAllocator &allocator);