[FEAT MERGE] incremental direct load phase I

Co-authored-by: Monk-Liu <1152761042@qq.com>
Co-authored-by: suz-yang <suz.yang@foxmail.com>
Co-authored-by: ZenoWang <wzybuaasoft@163.com>
This commit is contained in:
coolfishchen
2024-04-22 09:23:47 +00:00
committed by ob-robot
parent 4d7b31b518
commit 9de65fb1d7
278 changed files with 13417 additions and 6679 deletions

View File

@ -57,7 +57,6 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
: tenant_id_(OB_INVALID_ID),
database_id_(OB_INVALID_ID),
table_id_(OB_INVALID_ID),
sql_mode_(0),
parallel_(0),
thread_count_(0),
batch_row_count_(0),
@ -66,7 +65,9 @@ ObLoadDataDirectImpl::LoadExecuteParam::LoadExecuteParam()
online_opt_stat_gather_(false),
max_error_rows_(-1),
ignore_row_num_(-1),
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE)
dup_action_(ObLoadDupActionType::LOAD_INVALID_MODE),
method_(ObDirectLoadMethod::INVALID_METHOD),
insert_mode_(ObDirectLoadInsertMode::INVALID_INSERT_MODE)
{
store_column_idxs_.set_tenant_id(MTL_ID());
}
@ -77,8 +78,19 @@ bool ObLoadDataDirectImpl::LoadExecuteParam::is_valid() const
OB_INVALID_ID != table_id_ && !database_name_.empty() && !table_name_.empty() &&
!combined_name_.empty() && parallel_ > 0 && thread_count_ > 0 && batch_row_count_ > 0 &&
data_mem_usage_limit_ > 0 && max_error_rows_ >= 0 && ignore_row_num_ >= 0 &&
ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ && data_access_param_.is_valid() &&
!store_column_idxs_.empty();
ObLoadDupActionType::LOAD_INVALID_MODE != dup_action_ &&
ObDirectLoadMethod::is_type_valid(method_) &&
ObDirectLoadInsertMode::is_type_valid(insert_mode_) &&
(storage::ObDirectLoadMethod::is_full(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_full_method(insert_mode_)
: true) &&
(storage::ObDirectLoadMethod::is_incremental(method_)
? storage::ObDirectLoadInsertMode::is_valid_for_incremental_method(insert_mode_)
: true) &&
(storage::ObDirectLoadInsertMode::INC_REPLACE == insert_mode_
? sql::ObLoadDupActionType::LOAD_REPLACE == dup_action_
: true) &&
data_access_param_.is_valid() && !store_column_idxs_.empty();
}
/**
@ -1831,10 +1843,10 @@ int ObLoadDataDirectImpl::execute(ObExecContext &ctx, ObLoadDataStmt &load_stmt)
}
if (OB_SUCC(ret)) {
ObTableLoadResultInfo result_info;
if (OB_FAIL(direct_loader_.commit(result_info))) {
if (OB_FAIL(direct_loader_.commit())) {
LOG_WARN("fail to commit direct loader", KR(ret));
} else {
const ObTableLoadResultInfo &result_info = direct_loader_.get_result_info();
ObPhysicalPlanCtx *phy_plan_ctx = ctx.get_physical_plan_ctx();
phy_plan_ctx->set_affected_rows(result_info.rows_affected_);
phy_plan_ctx->set_row_matched_count(total_line_count);
@ -1889,31 +1901,36 @@ int ObLoadDataDirectImpl::init_execute_param()
hint_batch_size > 0 ? hint_batch_size : DEFAULT_BUFFERRED_ROW_COUNT;
}
}
// need_sort_
// direct load hint
if (OB_SUCC(ret)) {
int64_t enable_direct = 0;
int64_t hint_need_sort = 0;
if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::NEED_SORT, hint_need_sort))) {
LOG_WARN("fail to get value of NEED_SORT", KR(ret), K(hint));
} else if (enable_direct != 0) {
execute_param_.need_sort_ = hint_need_sort > 0 ? true : false;
} else {
const ObDirectLoadHint &direct_load_hint = hint.get_direct_load_hint();
if (direct_load_hint.is_enable()) {
execute_param_.need_sort_ = direct_load_hint.need_sort();
execute_param_.max_error_rows_ = direct_load_hint.get_max_error_row_count();
execute_param_.method_ =
(direct_load_hint.is_inc_direct_load() ? ObDirectLoadMethod::INCREMENTAL
: ObDirectLoadMethod::FULL);
execute_param_.insert_mode_ = ObDirectLoadInsertMode::NORMAL;
if (OB_UNLIKELY(direct_load_hint.is_inc_load_method())) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("inc load method not supported", KR(ret), K(direct_load_hint));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "inc load method in direct load is");
} else if (direct_load_hint.is_inc_replace_load_method()) {
if (OB_UNLIKELY(ObLoadDupActionType::LOAD_STOP_ON_DUP != load_args.dupl_action_)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("replace or ignore for inc_replace load method not supported", KR(ret),
K(direct_load_hint), K(load_args.dupl_action_));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "replace or ignore for inc_replace load method in direct load is");
} else {
execute_param_.dup_action_ = ObLoadDupActionType::LOAD_REPLACE; // rewrite dup action
execute_param_.insert_mode_ = ObDirectLoadInsertMode::INC_REPLACE;
}
}
} else { // append
execute_param_.need_sort_ = true;
}
}
// sql_mode_
if (OB_SUCC(ret)) {
ObSQLSessionInfo *session = nullptr;
uint64_t sql_mode;
if (OB_ISNULL(session = ctx_->get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session is null", KR(ret));
} else if (OB_FAIL(session->get_sys_variable(SYS_VAR_SQL_MODE, sql_mode))) {
LOG_WARN("fail to get sys variable", K(ret));
} else {
execute_param_.sql_mode_ = sql_mode;
execute_param_.max_error_rows_ = 0;
execute_param_.method_ = ObDirectLoadMethod::FULL;
execute_param_.insert_mode_ = ObDirectLoadInsertMode::NORMAL;
}
}
// online_opt_stat_gather_
@ -1934,23 +1951,6 @@ int ObLoadDataDirectImpl::init_execute_param()
execute_param_.online_opt_stat_gather_ = false;
}
}
// max_error_rows_
if (OB_SUCC(ret)) {
int64_t append = 0;
int64_t enable_direct = 0;
int64_t hint_error_rows = 0;
if (OB_FAIL(hint.get_value(ObLoadDataHint::APPEND, append))) {
LOG_WARN("fail to get value of APPEND", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::ENABLE_DIRECT, enable_direct))) {
LOG_WARN("fail to get value of ENABLE_DIRECT", K(ret));
} else if (OB_FAIL(hint.get_value(ObLoadDataHint::ERROR_ROWS, hint_error_rows))) {
LOG_WARN("fail to get value of ERROR_ROWS", KR(ret), K(hint));
} else if (enable_direct != 0) {
execute_param_.max_error_rows_ = hint_error_rows;
} else {
execute_param_.max_error_rows_ = 0;
}
}
// data_access_param_
if (OB_SUCC(ret)) {
DataAccessParam &data_access_param = execute_param_.data_access_param_;
@ -2040,11 +2040,10 @@ int ObLoadDataDirectImpl::init_execute_context()
load_param.column_count_ = execute_param_.store_column_idxs_.count();
load_param.need_sort_ = execute_param_.need_sort_;
load_param.dup_action_ = execute_param_.dup_action_;
load_param.sql_mode_ = execute_param_.sql_mode_;
load_param.px_mode_ = false;
load_param.online_opt_stat_gather_ = execute_param_.online_opt_stat_gather_;
load_param.method_ = ObDirectLoadMethod::FULL;
load_param.insert_mode_ = ObDirectLoadInsertMode::NORMAL;
load_param.method_ = execute_param_.method_;
load_param.insert_mode_ = execute_param_.insert_mode_;
if (OB_FAIL(direct_loader_.init(load_param, execute_param_.store_column_idxs_,
&execute_ctx_.exec_ctx_))) {
LOG_WARN("fail to init direct loader", KR(ret));