fix bug of direct load serialize exec context

This commit is contained in:
coolfishchen 2024-11-21 22:45:36 +00:00 committed by ob-robot
parent ffc34e8fd7
commit 8a66e7ef9e
4 changed files with 40 additions and 26 deletions

View File

@ -537,10 +537,10 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
arg.load_mode_ = param_.load_mode_;
arg.compressor_type_ = param_.compressor_type_;
arg.online_sample_percent_ = param_.online_sample_percent_;
const ObExecContext *exec_ctx = arg.session_info_->get_cur_exec_ctx();
if (exec_ctx == nullptr) {
//do nothing
} else if (OB_FAIL(arg.set_exec_ctx_serialized_str(*exec_ctx))) {
if (ctx_->exec_ctx_ == nullptr) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("exec ctx must not be nullptr", KR(ret));
} else if (OB_FAIL(arg.set_exec_ctx_serialized_str(*(ctx_->exec_ctx_)))) {
LOG_WARN("fail to set exec ctx", KR(ret));
}

View File

@ -500,7 +500,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam &param,
LOG_WARN("unexpected table ctx is not null", KR(ret));
} else if (OB_FAIL(ObTableLoadService::alloc_ctx(table_ctx))) {
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string("")))) {
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, ObString::make_string(""), execute_ctx_->exec_ctx_))) {
LOG_WARN("fail to init table ctx", KR(ret));
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) {
LOG_WARN("fail to coordinator init ctx", KR(ret));

View File

@ -45,7 +45,8 @@ ObTableLoadTableCtx::ObTableLoadTableCtx()
is_assigned_resource_(false),
is_assigned_memory_(false),
mark_delete_(false),
is_inited_(false)
is_inited_(false),
des_exec_ctx_(nullptr)
{
free_session_ctx_.sessid_ = sql::ObSQLSessionInfo::INVALID_SESSID;
allocator_.set_tenant_id(MTL_ID());
@ -56,29 +57,31 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx()
destroy();
}
int ObTableLoadTableCtx::new_exec_ctx(const ObString &exec_ctx_serialized_str)
int ObTableLoadTableCtx::new_exec_ctx(const ObString &des_exec_ctx_serialized_str)
{
int ret = OB_SUCCESS;
if (!exec_ctx_serialized_str.empty()) {
if (!des_exec_ctx_serialized_str.empty()) {
ObString tmp_str;
exec_ctx_ = OB_NEWx(ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
des_exec_ctx_ = OB_NEWx(ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
int64_t pos = 0;
if (exec_ctx_ == nullptr) {
if (des_exec_ctx_ == nullptr) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to deserialize exe ctx", KR(ret));
} else if (OB_FAIL(ob_write_string(allocator_, exec_ctx_serialized_str, tmp_str))) {
} else if (OB_FAIL(ob_write_string(allocator_, des_exec_ctx_serialized_str, tmp_str))) {
LOG_WARN("fail to copy string", KR(ret));
} else if (OB_FAIL(exec_ctx_->deserialize(tmp_str.ptr(), tmp_str.length(), pos))) {
} else if (OB_FAIL(des_exec_ctx_->deserialize(tmp_str.ptr(), tmp_str.length(), pos))) {
LOG_WARN("fail to deserialize exec ctx", KR(ret));
}
}
return ret;
}
int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param,
sql::ObSQLSessionInfo *session_info,
const common::ObString &exec_ctx_serialized_str)
int ObTableLoadTableCtx::init(const ObTableLoadParam &param,
const ObTableLoadDDLParam &ddl_param,
sql::ObSQLSessionInfo *session_info,
const common::ObString &des_exec_ctx_serialized_str,
ObExecContext *exec_ctx)
{
int ret = OB_SUCCESS;
if (IS_INIT) {
@ -87,6 +90,10 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param));
} else if ((des_exec_ctx_serialized_str.empty() && exec_ctx == nullptr)
|| (!des_exec_ctx_serialized_str.empty() && exec_ctx != nullptr)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid args", KR(ret), K(des_exec_ctx_serialized_str.empty()), KP(exec_ctx));
} else {
param_ = param;
ddl_param_ = ddl_param;
@ -103,16 +110,18 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam &param, const ObTableLoadDD
LOG_WARN("fail to create session info", KR(ret));
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(*session_info, *session_info_, allocator_))) {
LOG_WARN("fail to deep copy", KR(ret));
} else if (OB_FAIL(new_exec_ctx(exec_ctx_serialized_str))) {
} else if (OB_FAIL(new_exec_ctx(des_exec_ctx_serialized_str))) {
LOG_WARN("fail to new exec ctx", KR(ret));
} else {
is_inited_ = true;
}
}
if (OB_SUCC(ret)) {
if (exec_ctx_ != nullptr && session_info_ != nullptr) {
ObSQLSessionInfo::ExecCtxSessionRegister(*session_info_, *exec_ctx_);
if (exec_ctx != nullptr) {
exec_ctx_ = exec_ctx;
} else {
exec_ctx_ = des_exec_ctx_;
}
is_inited_ = true;
}
return ret;
@ -281,10 +290,10 @@ void ObTableLoadTableCtx::destroy()
allocator_.free(store_ctx_);
store_ctx_ = nullptr;
}
if (nullptr != exec_ctx_) {
exec_ctx_->~ObDesExecContext();
allocator_.free(exec_ctx_);
exec_ctx_ = nullptr;
if (nullptr != des_exec_ctx_) {
des_exec_ctx_->~ObDesExecContext();
allocator_.free(des_exec_ctx_);
des_exec_ctx_ = nullptr;
}
if (nullptr != session_info_) {
observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);

View File

@ -38,7 +38,11 @@ class ObTableLoadTableCtx : public common::ObDLinkBase<ObTableLoadTableCtx>
public:
ObTableLoadTableCtx();
~ObTableLoadTableCtx();
int init(const ObTableLoadParam &param, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info, const common::ObString &exec_ctx_serialized_str);
int init(const ObTableLoadParam &param,
const ObTableLoadDDLParam &ddl_param,
sql::ObSQLSessionInfo *session_info,
const common::ObString &exec_ctx_serialized_str,
sql::ObExecContext *exec_ctx = nullptr);
void stop();
void destroy();
bool is_valid() const { return is_inited_; }
@ -89,7 +93,7 @@ public:
sql::ObLoadDataGID gid_;
sql::ObLoadDataStat *job_stat_;
sql::ObSQLSessionInfo *session_info_;
sql::ObDesExecContext *exec_ctx_;
sql::ObExecContext *exec_ctx_;
sql::ObFreeSessionCtx free_session_ctx_;
private:
// 只在初始化的时候使用, 线程不安全
@ -102,6 +106,7 @@ private:
bool is_assigned_memory_;
bool mark_delete_;
bool is_inited_;
sql::ObDesExecContext *des_exec_ctx_;
DISALLOW_COPY_AND_ASSIGN(ObTableLoadTableCtx);
};