Revert serialize exec context for direct load
This commit is contained in:
parent
ce5141ebdd
commit
f39b2fef6e
@ -16,7 +16,6 @@
|
||||
#include "observer/table_load/ob_table_load_service.h"
|
||||
#include "observer/table_load/ob_table_load_store.h"
|
||||
#include "observer/table_load/ob_table_load_table_ctx.h"
|
||||
#include "sql/engine/ob_des_exec_context.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -100,7 +99,7 @@ int ObDirectLoadControlPreBeginExecutor::create_table_ctx(const ObTableLoadParam
|
||||
if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
|
||||
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_, arg_.des_exec_ctx_))) {
|
||||
} else if (OB_FAIL(table_ctx->init(param, ddl_param, arg_.session_info_))) {
|
||||
LOG_WARN("fail to init table ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadStore::init_ctx(table_ctx, arg_.partition_id_array_,
|
||||
arg_.target_partition_id_array_))) {
|
||||
|
@ -14,7 +14,6 @@
|
||||
|
||||
#include "ob_table_load_control_rpc_struct.h"
|
||||
#include "observer/table_load/ob_table_load_utils.h"
|
||||
#include "sql/engine/ob_des_exec_context.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -70,10 +69,7 @@ ObDirectLoadControlPreBeginArg::ObDirectLoadControlPreBeginArg()
|
||||
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE),
|
||||
load_mode_(ObDirectLoadMode::INVALID_MODE),
|
||||
compressor_type_(ObCompressorType::INVALID_COMPRESSOR),
|
||||
online_sample_percent_(1.),
|
||||
exec_ctx_(nullptr),
|
||||
allocator_("TLD_pre_begin"),
|
||||
des_exec_ctx_(nullptr)
|
||||
online_sample_percent_(1.)
|
||||
{
|
||||
free_session_ctx_.sessid_ = ObSQLSessionInfo::INVALID_SESSID;
|
||||
}
|
||||
@ -86,12 +82,6 @@ ObDirectLoadControlPreBeginArg::~ObDirectLoadControlPreBeginArg()
|
||||
}
|
||||
session_info_ = nullptr;
|
||||
}
|
||||
|
||||
if (nullptr != des_exec_ctx_) {
|
||||
des_exec_ctx_->~ObDesExecContext();
|
||||
allocator_.free(des_exec_ctx_);
|
||||
des_exec_ctx_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg)
|
||||
@ -124,15 +114,6 @@ OB_DEF_SERIALIZE(ObDirectLoadControlPreBeginArg)
|
||||
load_mode_,
|
||||
compressor_type_,
|
||||
online_sample_percent_);
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(exec_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("exec ctx is null", KR(ret));
|
||||
} else {
|
||||
OB_UNIS_ENCODE(*exec_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -165,16 +146,6 @@ OB_DEF_DESERIALIZE(ObDirectLoadControlPreBeginArg)
|
||||
load_mode_,
|
||||
compressor_type_,
|
||||
online_sample_percent_);
|
||||
if (OB_SUCC(ret)) {
|
||||
des_exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
|
||||
|
||||
if (des_exec_ctx_ == nullptr) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to allocate des_exec_ctx", KR(ret));
|
||||
} else {
|
||||
LST_DO_CODE(OB_UNIS_DECODE, *des_exec_ctx_);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -209,16 +180,6 @@ OB_DEF_SERIALIZE_SIZE(ObDirectLoadControlPreBeginArg)
|
||||
load_mode_,
|
||||
compressor_type_,
|
||||
online_sample_percent_);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(exec_ctx_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("exec ctx is null", KR(ret));
|
||||
} else {
|
||||
OB_UNIS_ADD_LEN(*exec_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
|
@ -26,12 +26,6 @@
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
namespace sql
|
||||
{
|
||||
class ObExecContext;
|
||||
class ObDesExecContext;
|
||||
}
|
||||
|
||||
namespace observer
|
||||
{
|
||||
enum class ObDirectLoadControlCommandType
|
||||
@ -202,9 +196,6 @@ public:
|
||||
storage::ObDirectLoadMode::Type load_mode_;
|
||||
ObCompressorType compressor_type_;
|
||||
double online_sample_percent_;
|
||||
sql::ObExecContext *exec_ctx_;
|
||||
common::ObArenaAllocator allocator_;
|
||||
sql::ObDesExecContext *des_exec_ctx_;
|
||||
};
|
||||
|
||||
class ObDirectLoadControlConfirmBeginArg final
|
||||
|
@ -458,11 +458,6 @@ int ObTableLoadCoordinator::pre_begin_peers(ObDirectLoadResourceApplyArg &apply_
|
||||
arg.load_mode_ = param_.load_mode_;
|
||||
arg.compressor_type_ = param_.compressor_type_;
|
||||
arg.online_sample_percent_ = param_.online_sample_percent_;
|
||||
arg.exec_ctx_ = arg.session_info_->get_cur_exec_ctx();
|
||||
if (arg.exec_ctx_ == nullptr) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("exec ctx is null", KR(ret));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < all_leader_info_array.count(); ++i) {
|
||||
const ObTableLoadPartitionLocation::LeaderInfo &leader_info = all_leader_info_array.at(i);
|
||||
const ObTableLoadPartitionLocation::LeaderInfo &target_leader_info =
|
||||
|
@ -499,7 +499,7 @@ int ObTableLoadInstance::start_direct_load(const ObTableLoadParam ¶m,
|
||||
} else if (OB_ISNULL(table_ctx = ObTableLoadService::alloc_ctx())) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to alloc table ctx", KR(ret), K(param));
|
||||
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info, execute_ctx_->exec_ctx_))) {
|
||||
} else if (OB_FAIL(table_ctx->init(param, stmt_ctx_.ddl_param_, session_info))) {
|
||||
LOG_WARN("fail to init table ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadCoordinator::init_ctx(table_ctx, column_ids, execute_ctx_))) {
|
||||
LOG_WARN("fail to coordinator init ctx", KR(ret));
|
||||
|
@ -21,8 +21,6 @@
|
||||
#include "observer/table_load/ob_table_load_trans_ctx.h"
|
||||
#include "observer/table_load/ob_table_load_utils.h"
|
||||
#include "share/ob_common_rpc_proxy.h"
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "sql/engine/ob_des_exec_context.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -38,7 +36,6 @@ ObTableLoadTableCtx::ObTableLoadTableCtx()
|
||||
store_ctx_(nullptr),
|
||||
job_stat_(nullptr),
|
||||
session_info_(nullptr),
|
||||
exec_ctx_(nullptr),
|
||||
allocator_("TLD_TableCtx"),
|
||||
ref_count_(0),
|
||||
is_assigned_resource_(false),
|
||||
@ -56,28 +53,16 @@ ObTableLoadTableCtx::~ObTableLoadTableCtx()
|
||||
destroy();
|
||||
}
|
||||
|
||||
int ObTableLoadTableCtx::new_exec_ctx()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
exec_ctx_ = OB_NEWx(sql::ObDesExecContext, &allocator_, allocator_, GCTX.session_mgr_);
|
||||
if (exec_ctx_ == nullptr) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to allocate des_exec_ctx", KR(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param,
|
||||
sql::ObSQLSessionInfo *session_info,
|
||||
sql::ObExecContext *ctx)
|
||||
sql::ObSQLSessionInfo *session_info)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_INIT) {
|
||||
ret = OB_INIT_TWICE;
|
||||
LOG_WARN("ObTableLoadTableCtx init twice", KR(ret));
|
||||
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info || ctx == nullptr)) {
|
||||
} else if (OB_UNLIKELY(!param.is_valid() || !ddl_param.is_valid() || nullptr == session_info)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param), KP(ctx));
|
||||
LOG_WARN("invalid args", KR(ret), K(param), K(ddl_param));
|
||||
} else {
|
||||
param_ = param;
|
||||
ddl_param_ = ddl_param;
|
||||
@ -94,23 +79,10 @@ int ObTableLoadTableCtx::init(const ObTableLoadParam ¶m, const ObTableLoadDD
|
||||
LOG_WARN("fail to create session info", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(*session_info, *session_info_, allocator_))) {
|
||||
LOG_WARN("fail to deep copy", KR(ret));
|
||||
} else if (OB_FAIL(new_exec_ctx())) {
|
||||
LOG_WARN("fail to new exec ctx", KR(ret));
|
||||
} else if (OB_FAIL(ObTableLoadUtils::deep_copy(*ctx, *exec_ctx_, allocator_))) {
|
||||
LOG_WARN("fail to deep copy", KR(ret));
|
||||
} else {
|
||||
is_inited_ = true;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (exec_ctx_ == nullptr || session_info_ == nullptr) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("cur exec ctx is null", KR(ret), KP(session_info_), KP(exec_ctx_));
|
||||
} else {
|
||||
ObSQLSessionInfo::ExecCtxSessionRegister(*session_info_, *exec_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -281,11 +253,6 @@ void ObTableLoadTableCtx::destroy()
|
||||
observer::ObTableLoadUtils::free_session_info(session_info_, free_session_ctx_);
|
||||
session_info_ = nullptr;
|
||||
}
|
||||
if (nullptr != exec_ctx_) {
|
||||
exec_ctx_->~ObDesExecContext();
|
||||
allocator_.free(exec_ctx_);
|
||||
exec_ctx_ = nullptr;
|
||||
}
|
||||
unregister_job_stat();
|
||||
is_inited_ = false;
|
||||
}
|
||||
|
@ -38,7 +38,7 @@ class ObTableLoadTableCtx : public common::ObDLinkBase<ObTableLoadTableCtx>
|
||||
public:
|
||||
ObTableLoadTableCtx();
|
||||
~ObTableLoadTableCtx();
|
||||
int init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info, sql::ObExecContext *ctx);
|
||||
int init(const ObTableLoadParam ¶m, const ObTableLoadDDLParam &ddl_param, sql::ObSQLSessionInfo *session_info);
|
||||
void stop();
|
||||
void destroy();
|
||||
bool is_valid() const { return is_inited_; }
|
||||
@ -79,7 +79,6 @@ public:
|
||||
private:
|
||||
int register_job_stat();
|
||||
void unregister_job_stat();
|
||||
int new_exec_ctx();
|
||||
public:
|
||||
ObTableLoadParam param_;
|
||||
ObTableLoadDDLParam ddl_param_;
|
||||
@ -89,7 +88,6 @@ public:
|
||||
sql::ObLoadDataGID gid_;
|
||||
sql::ObLoadDataStat *job_stat_;
|
||||
sql::ObSQLSessionInfo *session_info_;
|
||||
sql::ObDesExecContext *exec_ctx_;
|
||||
sql::ObFreeSessionCtx free_session_ctx_;
|
||||
private:
|
||||
// 只在初始化的时候使用, 线程不安全
|
||||
|
@ -19,7 +19,6 @@
|
||||
#include "sql/session/ob_sql_session_info.h"
|
||||
#include "storage/blocksstable/ob_datum_range.h"
|
||||
#include "storage/blocksstable/ob_datum_row.h"
|
||||
#include "sql/engine/ob_des_exec_context.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -190,32 +189,6 @@ int ObTableLoadUtils::deep_copy(const ObDatumRange &src, ObDatumRange &dest, ObI
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObTableLoadUtils::deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
char *buf = nullptr;
|
||||
int64_t buf_size = src.get_serialize_size();
|
||||
int64_t data_len = 0;
|
||||
int64_t pos = 0;
|
||||
if (OB_ISNULL(buf = static_cast<char *>(allocator.alloc(buf_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
LOG_WARN("fail to allocate buffer", KR(ret), K(buf_size));
|
||||
} else if (OB_FAIL(src.serialize(buf, buf_size, pos))) {
|
||||
LOG_WARN("serialize session info failed", KR(ret));
|
||||
} else {
|
||||
data_len = pos;
|
||||
pos = 0;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(dest.deserialize(buf, data_len, pos))) {
|
||||
LOG_WARN("deserialize session info failed", KR(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int ObTableLoadUtils::deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -41,7 +41,6 @@ namespace table
|
||||
namespace sql
|
||||
{
|
||||
class ObSQLSessionInfo;
|
||||
class ObDesExecContext;
|
||||
} // namespace sql
|
||||
namespace observer
|
||||
{
|
||||
@ -61,7 +60,6 @@ public:
|
||||
static int deep_copy(const blocksstable::ObDatumRowkey &src, blocksstable::ObDatumRowkey &dest, common::ObIAllocator &allocator);
|
||||
static int deep_copy(const blocksstable::ObDatumRange &src, blocksstable::ObDatumRange &dest, common::ObIAllocator &allocator);
|
||||
static int deep_copy(const sql::ObSQLSessionInfo &src, sql::ObSQLSessionInfo &dest, common::ObIAllocator &allocator);
|
||||
static int deep_copy(const sql::ObExecContext &src, sql::ObDesExecContext &dest, common::ObIAllocator &allocator);
|
||||
|
||||
template<class T>
|
||||
static int deep_copy(const table::ObTableLoadArray<T> &src, table::ObTableLoadArray<T> &dest, common::ObIAllocator &allocator);
|
||||
|
Loading…
x
Reference in New Issue
Block a user