add for replace and insert_up with batch_optimization

This commit is contained in:
yishenglanlingzui
2023-02-17 07:12:56 +00:00
committed by ob-robot
parent 671aeb2cea
commit f001af9558
7 changed files with 170 additions and 80 deletions

View File

@ -565,6 +565,10 @@ int ObDmlCgService::get_table_unique_key_exprs(ObLogDelUpd &op,
}
}
// The reason why batch optimization adds stmt_id to unique_key is
// For multi_update/multi_delete statements that need to be deduplicated,
// if different stmt_id statements exist to process duplicate rows,
// Will be regarded as duplicate rows by deduplication logic, adding stmt_id to unique_key can avoid this problem
if (OB_SUCC(ret) && op.get_plan()->get_optimizer_context().is_batched_multi_stmt()) {
ObRawExpr *stmt_id_expr = nullptr;
if (op.get_stmt_id_expr() == nullptr) {
@ -579,6 +583,28 @@ int ObDmlCgService::get_table_unique_key_exprs(ObLogDelUpd &op,
return ret;
}
// This function is only used for insert_up qualified replace_into
// When generating an insert to generate a primary key conflict,
// the conflicting column information needs to be returned
// For a partitioned table without primary key, return hidden primary key + partition key
int ObDmlCgService::table_unique_key_for_conflict_checker(ObLogDelUpd &op,
const IndexDMLInfo &index_dml_info,
ObIArray<ObRawExpr*> &rowkey_exprs)
{
int ret = OB_SUCCESS;
bool is_heap_table = false;
if (OB_FAIL(check_is_heap_table(op, index_dml_info.ref_table_id_, is_heap_table))) {
LOG_WARN("check is heap table failed", K(ret));
} else if (OB_FAIL(index_dml_info.get_rowkey_exprs(rowkey_exprs))) {
LOG_WARN("get table rowkey failed", K(ret), K(index_dml_info));
} else if (is_heap_table) {
if (OB_FAIL(get_heap_table_part_exprs(op, index_dml_info, rowkey_exprs))) {
LOG_WARN("get heap table part exprs failed", K(ret), K(index_dml_info));
}
}
return ret;
}
int ObDmlCgService::get_heap_table_part_exprs(const ObLogicalOperator &op,
const IndexDMLInfo &index_dml_info,
ObIArray<ObRawExpr*> &part_key_exprs)
@ -639,10 +665,11 @@ int ObDmlCgService::convert_data_table_rowkey_info(ObLogDelUpd &op,
ObSEArray<uint64_t, 8> rowkey_column_ids;
ObSEArray<ObRawExpr *, 8> rowkey_exprs;
ObSEArray<ObObjMeta, 8> rowkey_column_types;
// rowkey_exprs的类型一定是column_ref表达式
if (OB_ISNULL(primary_dml_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("primary_index_dml_info is null", K(ret));
} else if (OB_FAIL(get_table_unique_key_exprs(op, *primary_dml_info, rowkey_exprs))) {
} else if (OB_FAIL(table_unique_key_for_conflict_checker(op, *primary_dml_info, rowkey_exprs))) {
LOG_WARN("get table unique key exprs failed", K(ret), KPC(primary_dml_info));
}
@ -736,18 +763,21 @@ int ObDmlCgService::generate_conflict_checker_ctdef(ObLogInsert &op,
int ret = OB_SUCCESS;
ObSEArray<ObRawExpr *, 8> rowkey_exprs;
if (OB_FAIL(get_table_unique_key_exprs(op, index_dml_info, rowkey_exprs))) {
// todo @jingfeng.jf need fix some bug of generated_column
if (OB_FAIL(table_unique_key_for_conflict_checker(op, index_dml_info, rowkey_exprs))) {
LOG_WARN("get table unique key exprs failed", K(ret), K(index_dml_info));
} else if (OB_FAIL(adjust_unique_key_exprs(rowkey_exprs))) {
// 替换其中的生成列
LOG_WARN("fail to replace generated column exprs", K(ret), K(rowkey_exprs));
} else if (OB_FAIL(cg_.generate_rt_exprs(rowkey_exprs, conflict_checker_ctdef.data_table_rowkey_expr_))) {
LOG_WARN("fail to generate data_table rowkey_expr", K(ret), K(rowkey_exprs));
} else if (OB_FAIL(generate_scan_ctdef(op, index_dml_info, conflict_checker_ctdef.das_scan_ctdef_))) {
LOG_WARN("fail to generate das_scan_ctdef", K(ret));
} else if (OB_FAIL(generate_constraint_infos(op,
index_dml_info,
conflict_checker_ctdef.cst_ctdefs_))) {
LOG_WARN("fail to generate constraint_infos", K(ret));
} else if (OB_FAIL(cg_.generate_rt_exprs(rowkey_exprs,
conflict_checker_ctdef.data_table_rowkey_expr_))) {
LOG_WARN("fail to generate data_table rowkey_expr", K(ret), K(rowkey_exprs));
} else if (OB_FAIL(cg_.generate_rt_exprs(index_dml_info.column_old_values_exprs_,
} else if (OB_FAIL(cg_.generate_rt_exprs(index_dml_info.column_old_values_exprs_,
conflict_checker_ctdef.table_column_exprs_))) {
LOG_WARN("fail to generate table columns rt exprs ", K(ret));
} else {