Fix direct load exec session info

This commit is contained in:
suz-yang
2023-05-24 11:11:34 +00:00
committed by ob-robot
parent 9cebbed15f
commit bdf9558e59
16 changed files with 207 additions and 84 deletions

View File

@ -76,7 +76,8 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const
*/
ObLoadDataDirectImpl::LoadExecuteContext::LoadExecuteContext()
: direct_loader_(nullptr),
: allocator_(nullptr),
direct_loader_(nullptr),
job_stat_(nullptr),
logger_(nullptr)
{
@ -84,8 +85,8 @@ ObLoadDataDirectImpl::LoadExecuteContext::LoadExecuteContext()
bool ObLoadDataDirectImpl::LoadExecuteContext::is_valid() const
{
return nullptr != exec_ctx_ && nullptr != allocator_ && nullptr != direct_loader_ &&
nullptr != job_stat_ && nullptr != logger_;
return exec_ctx_.is_valid() && nullptr != direct_loader_ && nullptr != job_stat_ &&
nullptr != logger_;
}
/**
@ -1126,7 +1127,7 @@ int ObLoadDataDirectImpl::FileLoadExecutor::execute()
LOG_WARN("fail to prepare execute", KR(ret));
}
while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->check_status())) {
while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->exec_ctx_.check_status())) {
TaskHandle *handle = nullptr;
if (OB_FAIL(get_next_task_handle(handle))) {
if (OB_UNLIKELY(OB_ITER_END != ret)) {
@ -1618,7 +1619,7 @@ int ObLoadDataDirectImpl::MultiFilesLoadTaskProcessor::process()
}
}
}
while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->check_status())) {
while (OB_SUCC(ret) && OB_SUCC(execute_ctx_->exec_ctx_.check_status())) {
if (OB_FAIL(handle_->data_buffer_.squash())) {
LOG_WARN("fail to squash data buffer", KR(ret));
} else if (OB_FAIL(data_reader_.get_next_raw_buffer(handle_->data_buffer_))) {
@ -1880,7 +1881,7 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
if (OB_FAIL(direct_loader_.commit(result_info))) {
LOG_WARN("fail to commit direct loader", KR(ret));
} else {
ObPhysicalPlanCtx *phy_plan_ctx = execute_ctx_.exec_ctx_->get_physical_plan_ctx();
ObPhysicalPlanCtx *phy_plan_ctx = ctx.get_physical_plan_ctx();
phy_plan_ctx->set_affected_rows(result_info.rows_affected_);
phy_plan_ctx->set_row_matched_count(total_line_count);
phy_plan_ctx->set_row_deleted_count(result_info.deleted_);
@ -2078,7 +2079,7 @@ int ObLoadDataDirectImpl::init_store_column_idxs(ObIArray<int64_t> &store_column
int ObLoadDataDirectImpl::init_execute_context()
{
int ret = OB_SUCCESS;
execute_ctx_.exec_ctx_ = ctx_;
execute_ctx_.exec_ctx_.exec_ctx_ = ctx_;
execute_ctx_.allocator_ = &ctx_->get_allocator();
ObTableLoadParam load_param;
load_param.tenant_id_ = execute_param_.tenant_id_;
@ -2094,8 +2095,8 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.sql_mode_ = execute_param_.sql_mode_;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
if (OB_FAIL(direct_loader_.init(load_param,
execute_param_.store_column_idxs_, &execute_ctx_))) {
if (OB_FAIL(direct_loader_.init(load_param, execute_param_.store_column_idxs_,
&execute_ctx_.exec_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));
} else if (OB_FAIL(init_logger())) {
LOG_WARN("fail to init logger", KR(ret));

View File

@ -92,13 +92,15 @@ private:
store_column_idxs_; // Mapping of stored columns to source data columns
};
struct LoadExecuteContext : public observer::ObTableLoadExecCtx
struct LoadExecuteContext
{
public:
LoadExecuteContext();
bool is_valid() const;
TO_STRING_KV(KP_(exec_ctx), KP_(allocator), KP_(direct_loader), KP_(job_stat), KP_(logger));
TO_STRING_KV(K_(exec_ctx), KP_(allocator), KP_(direct_loader), KP_(job_stat), KP_(logger));
public:
observer::ObTableLoadSqlExecCtx exec_ctx_;
common::ObIAllocator *allocator_;
observer::ObTableLoadInstance *direct_loader_;
sql::ObLoadDataStat *job_stat_;
Logger *logger_;

View File

@ -37,16 +37,15 @@ int ObTableDirectInsertCtx::init(ObExecContext *exec_ctx,
ret = OB_INVALID_ARGUMENT;
LOG_WARN("exec_ctx cannot be null", KR(ret));
} else {
load_exec_ctx_ = (ObTableLoadExecCtx *)exec_ctx->get_allocator().alloc(sizeof(ObTableLoadExecCtx));
table_load_instance_ = (ObTableLoadInstance *)exec_ctx->get_allocator().alloc(sizeof(ObTableLoadInstance));
if (OB_ISNULL(load_exec_ctx_) || OB_ISNULL(table_load_instance_)) {
if (OB_ISNULL(load_exec_ctx_ = OB_NEWx(ObTableLoadSqlExecCtx, &exec_ctx->get_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", KR(ret));
LOG_WARN("fail to new ObTableLoadSqlExecCtx", KR(ret));
} else if (OB_ISNULL(table_load_instance_ =
OB_NEWx(ObTableLoadInstance, &exec_ctx->get_allocator()))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to new ObTableLoadInstance", KR(ret));
} else {
new (load_exec_ctx_) ObTableLoadExecCtx;
new (table_load_instance_) ObTableLoadInstance;
load_exec_ctx_->exec_ctx_ = exec_ctx;
load_exec_ctx_->allocator_ = &(exec_ctx->get_allocator());
uint64_t sql_mode = 0;
ObSEArray<int64_t, 16> store_column_idxs;
omt::ObTenant *tenant = nullptr;
@ -118,7 +117,7 @@ void ObTableDirectInsertCtx::destroy()
table_load_instance_ = nullptr;
}
if (OB_NOT_NULL(load_exec_ctx_)) {
load_exec_ctx_->~ObTableLoadExecCtx();
load_exec_ctx_->~ObTableLoadSqlExecCtx();
load_exec_ctx_ = nullptr;
}
}

View File

@ -10,7 +10,7 @@ namespace oceanbase
{
namespace observer
{
class ObTableLoadExecCtx;
class ObTableLoadSqlExecCtx;
class ObTableLoadInstance;
}
@ -36,7 +36,7 @@ private:
int init_store_column_idxs(const uint64_t tenant_id, const uint64_t table_id,
common::ObIArray<int64_t> &store_column_idxs);
private:
observer::ObTableLoadExecCtx *load_exec_ctx_;
observer::ObTableLoadSqlExecCtx *load_exec_ctx_;
observer::ObTableLoadInstance *table_load_instance_;
bool is_inited_;
};