serialize exec context for direct load

This commit is contained in:
coolfishchen 2024-09-03 05:20:37 +00:00 committed by ob-robot
parent c1d8e1ad80
commit 48090094a2
9 changed files with 125 additions and 7 deletions

View File

@ -16,6 +16,7 @@
#include "observer/table_load/ob_table_load_service.h"
#include "observer/table_load/ob_table_load_store.h"
#include "observer/table_load/ob_table_load_table_ctx.h"
#include "sql/engine/ob_des_exec_context.h"
namespace oceanbase
{
@ -99,7 +100,7 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) {
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_, arg_.des_exec_ctx_))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx, arg_.partition_id_array_,
arg_.target_partition_id_array_))) {

View File

@ -14,6 +14,7 @@
#include "ob_table_load_control_rpc_struct.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "sql/engine/ob_des_exec_context.h"
namespace oceanbase
{
@ -69,7 +70,10 @@ ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg()
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
load_mode_(ObDirectLoadMode::INVALID_MODE),
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
online_sample_percent_(1.)
online_sample_percent_(1.),
exec_ctx_(nullptr),
allocator_("TLD_pre_begin"),
des_exec_ctx_(nullptr)
{
free_session_ctx_.sessid_ = ObSQLSessionInfo::INVALID_SESSID;
}
@ -82,6 +86,12 @@ ObDirectLoadControlPreBeginArg::~ObDirectLoadControlPreBeginArg()
}
session_info_ = nullptr;
}
if (nullptr != des_exec_ctx_) {
des_exec_ctx_->~ObDesExecContext();
allocator_.free(des_exec_ctx_);
des_exec_ctx_ = nullptr;
}
}
OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg)
@ -114,6 +124,15 @@ OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg)
load_mode_,
compressor_type_,
online_sample_percent_);
if (OB_SUCC(ret)) {
if (OB_ISNULL(exec_ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("exec ctx is null", KR(ret));
} else {
OB_UNIS_ENCODE(*exec_ctx_);
}
}
return ret;
}
@ -146,6 +165,16 @@ OB_DEF_DESERIALIZE(ObDirectLoadControlPreBeginArg)
load_mode_,
compressor_type_,
online_sample_percent_);
if (OB_SUCC(ret)) {
des_exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
if (des_exec_ctx_ == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate des_exec_ctx", KR(ret));
} else {
LST_DO_CODE(OB_UNIS_DECODE, *des_exec_ctx_);
}
}
return ret;
}
@ -180,6 +209,16 @@ OB_DEF_SERIALIZE_SIZE(ObDirectLoadControlPreBeginArg)
load_mode_,
compressor_type_,
online_sample_percent_);
if (OB_SUCC(ret)) {
if (OB_ISNULL(exec_ctx_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("exec ctx is null", KR(ret));
} else {
OB_UNIS_ADD_LEN(*exec_ctx_);
}
}
return len;
}

View File

@ -26,6 +26,12 @@
namespace oceanbase
{
namespace sql
{
class ObExecContext;
class ObDesExecContext;
}
namespace observer
{
enum class ObDirectLoadControlCommandType
@ -196,6 +202,9 @@ public:
storage::ObDirectLoadMode::Type load_mode_;
ObCompressorType compressor_type_;
double online_sample_percent_;
sql::ObExecContext *exec_ctx_;
common::ObArenaAllocator allocator_;
sql::ObDesExecContext *des_exec_ctx_;
};
class ObDirectLoadControlConfirmBeginArg final

View File

@ -458,6 +458,11 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
arg.load_mode_ = param_.load_mode_;
arg.compressor_type_ = param_.compressor_type_;
arg.online_sample_percent_ = param_.online_sample_percent_;
arg.exec_ctx_ = arg.session_info_->get_cur_exec_ctx();
if (arg.exec_ctx_ == nullptr) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("exec ctx is null", KR(ret));
}
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info =

View File

@ -499,7 +499,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam &param,
} else if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info))) {
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, execute_ctx_->exec_ctx_))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) {
LOG_WARN("fail to coordinator init ctx", KR(ret));

View File

@ -21,6 +21,8 @@
#include "observer/table_load/ob_table_load_trans_ctx.h"
#include "observer/table_load/ob_table_load_utils.h"
#include "share/ob_common_rpc_proxy.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/engine/ob_des_exec_context.h"
namespace oceanbase
{
@ -36,6 +38,7 @@ ObTableLoadTableCtx::ObTableLoadTableCtx()
store_ctx_(nullptr),
job_stat_(nullptr),
session_info_(nullptr),
exec_ctx_(nullptr),
allocator_("TLD_TableCtx"),
ref_count_(0),
is_assigned_resource_(false),
@ -53,16 +56,28 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx()
destroy();
}
int ObTableLoadTableCtx::new_exec_ctx()
{
int ret = OB_SUCCESS;
exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
if (exec_ctx_ == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate des_exec_ctx", KR(ret));
}
return ret;
}
int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param,
sql::ObSQLSessionInfo *session_info)
sql::ObSQLSessionInfo *session_info,
sql::ObExecContext *ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
ret = OB_INIT_TWICE;
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) {
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info || ctx == nullptr)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param));
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param), KP(ctx));
} else {
param_ = param;
ddl_param_ = ddl_param;
@ -79,10 +94,23 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
LOG_WARN("fail to create session info", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(*session_info, *session_info_, allocator_))) {
LOG_WARN("fail to deep copy", KR(ret));
} else if (OB_FAIL(new_exec_ctx())) {
LOG_WARN("fail to new exec ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(*ctx, *exec_ctx_, allocator_))) {
LOG_WARN("fail to deep copy", KR(ret));
} else {
is_inited_ = true;
}
}
if (OB_SUCC(ret)) {
if (exec_ctx_ == nullptr || session_info_ == nullptr) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("cur exec ctx is null", KR(ret), KP(session_info_), KP(exec_ctx_));
} else {
ObSQLSessionInfo::ExecCtxSessionRegister(*session_info_, *exec_ctx_);
}
}
return ret;
}
@ -253,6 +281,11 @@ void ObTableLoadTableCtx::destroy()
observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
session_info_ = nullptr;
}
if (nullptr != exec_ctx_) {
exec_ctx_->~ObDesExecContext();
allocator_.free(exec_ctx_);
exec_ctx_ = nullptr;
}
unregister_job_stat();
is_inited_ = false;
}

View File

@ -38,7 +38,7 @@ class ObTableLoadTableCtx : public common::ObDLinkBase<ObTableLoadTableCtx>
public:
ObTableLoadTableCtx();
~ObTableLoadTableCtx();
int init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info);
int init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info, sql::ObExecContext *ctx);
void stop();
void destroy();
bool is_valid() const { return is_inited_; }
@ -79,6 +79,7 @@ public:
private:
int register_job_stat();
void unregister_job_stat();
int new_exec_ctx();
public:
ObTableLoadParam param_;
ObTableLoadDDLParam ddl_param_;
@ -88,6 +89,7 @@ public:
sql::ObLoadDataGID gid_;
sql::ObLoadDataStat *job_stat_;
sql::ObSQLSessionInfo *session_info_;
sql::ObDesExecContext *exec_ctx_;
sql::ObFreeSessionCtx free_session_ctx_;
private:
// 只在初始化的时候使用, 线程不安全

View File

@ -19,6 +19,7 @@
#include "sql/session/ob_sql_session_info.h"
#include "storage/blocksstable/ob_datum_range.h"
#include "storage/blocksstable/ob_datum_row.h"
#include "sql/engine/ob_des_exec_context.h"
namespace oceanbase
{
@ -189,6 +190,32 @@ int ObTableLoadUtils::deep_copy(const ObDatumRange &src, ObDatumRange &dest, ObI
return ret;
}
int ObTableLoadUtils::deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;
char *buf = nullptr;
int64_t buf_size = src.get_serialize_size();
int64_t data_len = 0;
int64_t pos = 0;
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size));
} else if (OB_FAIL(src.serialize(buf, buf_size, pos))) {
LOG_WARN("serialize session info failed", KR(ret));
} else {
data_len = pos;
pos = 0;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(dest.deserialize(buf, data_len, pos))) {
LOG_WARN("deserialize session info failed", KR(ret));
}
}
return ret;
}
int ObTableLoadUtils::deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, ObIAllocator &allocator)
{
int ret = OB_SUCCESS;

View File

@ -41,6 +41,7 @@ namespace table
namespace sql
{
class ObSQLSessionInfo;
class ObDesExecContext;
} // namespace sql
namespace observer
{
@ -60,6 +61,7 @@ public:
static int deep_copy(const blocksstable::ObDatumRowkey &src, blocksstable::ObDatumRowkey &dest, common::ObIAllocator &allocator);
static int deep_copy(const blocksstable::ObDatumRange &src, blocksstable::ObDatumRange &dest, common::ObIAllocator &allocator);
static int deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, common::ObIAllocator &allocator);
static int deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, common::ObIAllocator &allocator);
template<class T>
static int deep_copy(const table::ObTableLoadArray<T> &src, table::ObTableLoadArray<T> &dest, common::ObIAllocator &allocator);