[FEAT MERGE] merge table insert overwrite

This commit is contained in:
Minionyh
2024-06-18 02:35:49 +00:00
committed by ob-robot
parent 046dac469a
commit ca53fe797d
37 changed files with 271 additions and 36 deletions

View File

@ -2752,11 +2752,35 @@ int ObStaticEngineCG::generate_spec(ObLogicalOperator &op,
UNUSED(in_root_job);
return OB_SUCCESS;
}
int ObStaticEngineCG::check_is_insert_overwrite_stmt(const ObLogPlan *plan, bool &is_insert_overwrite)
{
int ret = OB_SUCCESS;
is_insert_overwrite = false;
stmt::StmtType stmt_type = stmt::T_NONE;
if (OB_ISNULL(plan)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("log plan is null", K(ret), KP(this));
} else {
if (OB_FAIL(plan->get_stmt_type(stmt_type))) {
LOG_WARN("get stmt type of log plan failed", K(ret));
} else if (IS_INSERT_OR_REPLACE_STMT(stmt_type)) {
const ObInsertStmt *insert_stmt = static_cast<const ObInsertStmt *>(plan->get_stmt());
if (OB_NOT_NULL(insert_stmt)) {
if (insert_stmt->is_overwrite()) {
is_insert_overwrite = true;
}
}
}
}
return ret;
}
int ObStaticEngineCG::generate_insert_with_das(ObLogInsert &op, ObTableInsertSpec &spec)
{
int ret = OB_SUCCESS;
spec.check_fk_batch_ = true;
const ObLogPlan *log_plan = op.get_plan();
const ObIArray<IndexDMLInfo *> &index_dml_infos = op.get_index_dml_infos();
if (OB_ISNULL(phy_plan_)) {
ret = OB_INVALID_ARGUMENT;
@ -2769,6 +2793,16 @@ int ObStaticEngineCG::generate_insert_with_das(ObLogInsert &op, ObTableInsertSpe
}
}
if (OB_SUCC(ret)) {
bool is_insert_overwrite = false;
if (OB_FAIL(check_is_insert_overwrite_stmt(log_plan, is_insert_overwrite))) {
LOG_WARN("check is insert overwrite failed", K(ret));
} else if (is_insert_overwrite) {
ret = OB_NOT_SUPPORTED;
LOG_USER_ERROR(OB_NOT_SUPPORTED, "insert overwrite need open pdml");
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(spec.ins_ctdefs_.allocate_array(phy_plan_->get_allocator(), 1))) {
LOG_WARN("allocate insert ctdef array failed", K(ret), K(1));
@ -6460,8 +6494,10 @@ int ObStaticEngineCG::generate_spec(ObLogInsert &op,
{
int ret = OB_SUCCESS;
UNUSED(in_root_job);
const ObLogPlan *log_plan = op.get_plan();
if (OB_UNLIKELY(op.get_index_dml_infos().count() != 1) ||
OB_ISNULL(op.get_index_dml_infos().at(0))) {
OB_ISNULL(op.get_index_dml_infos().at(0)) ||
OB_ISNULL(log_plan)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("index dml info is invalid", K(ret), K(op.get_index_dml_infos().count()));
} else {
@ -6477,9 +6513,18 @@ int ObStaticEngineCG::generate_spec(ObLogInsert &op,
spec.plan_->set_enable_append(global_hint.has_direct_load());
spec.plan_->set_enable_inc_direct_load(global_hint.has_inc_direct_load());
spec.plan_->set_enable_replace(global_hint.has_replace());
// check is insert overwrite
bool is_insert_overwrite = false;
if (OB_FAIL(check_is_insert_overwrite_stmt(log_plan, is_insert_overwrite))) {
LOG_WARN("check is insert overwrite failed", K(ret));
} else if (is_insert_overwrite) {
spec.plan_->set_is_insert_overwrite(true);
}
}
int64_t partition_expr_idx = OB_INVALID_INDEX;
if (OB_FAIL(get_pdml_partition_id_column_idx(spec.get_child(0)->output_, partition_expr_idx))) {
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_FAIL(get_pdml_partition_id_column_idx(spec.get_child(0)->output_, partition_expr_idx))) {
LOG_WARN("failed to get partition id column idx", K(ret));
} else {
spec.row_desc_.set_part_id_index(partition_expr_idx);

View File

@ -564,6 +564,7 @@ private:
int extract_all_mview_ids(const ObIArray<ObRawExpr *> &exprs);
int extract_all_mview_ids(const ObRawExpr *expr);
int check_is_insert_overwrite_stmt(const ObLogPlan *plan, bool &is_insert_overwrite);
private:
struct BatchExecParamCache {
BatchExecParamCache(ObExecParamRawExpr* expr, ObOpSpec* spec, bool is_left)