support pdml for joined table

This commit is contained in:
sdc
2024-11-26 20:16:24 +00:00
committed by ob-robot
parent 51391ef173
commit 29062a71c2
6 changed files with 38 additions and 27 deletions

View File

@ -280,12 +280,14 @@ int ObDelUpdLogPlan::generate_dblink_raw_plan()
// 不过,这个第一行根据不同的join算法有随机性。
//
int ObDelUpdLogPlan::check_table_rowkey_distinct(
const ObIArray<IndexDMLInfo *> &index_dml_infos)
const ObIArray<IndexDMLInfo *> &index_dml_infos,
bool &need_duplicate_date)
{
int ret = OB_SUCCESS;
ObLogicalOperator *best_plan = NULL;
const ObDelUpdStmt *del_upd_stmt = NULL;
ObSEArray<ObRawExpr *, 8> rowkey_exprs;
need_duplicate_date = false;
if (OB_ISNULL(del_upd_stmt = get_stmt())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(del_upd_stmt));
@ -297,6 +299,8 @@ int ObDelUpdLogPlan::check_table_rowkey_distinct(
} else if (!del_upd_stmt->is_dml_table_from_join() ||
del_upd_stmt->has_instead_of_trigger()) {
//dml语句中不包含join条件,可以保证dml涉及到的行都来自于target table,不存在重复行,因此不需要去重
LOG_TRACE("skip check_table_rowkey_distinct", K(del_upd_stmt->is_dml_table_from_join()),
K(del_upd_stmt->dml_source_from_join()), K(del_upd_stmt->has_instead_of_trigger()));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < index_dml_infos.count(); ++i) {
IndexDMLInfo *index_dml_info = index_dml_infos.at(i);
@ -318,9 +322,12 @@ int ObDelUpdLogPlan::check_table_rowkey_distinct(
LOG_WARN("check dml is order unique failed", K(ret));
} else if (!is_unique) {
index_dml_info->distinct_algo_ = T_HASH_DISTINCT;
// need_duplicate_date = true means we cannot generate RANDOM exchange even if it's a heap table.
need_duplicate_date = true;
} else {
index_dml_info->distinct_algo_ = T_DISTINCT_NONE;
}
LOG_TRACE("check_table_rowkey_distinct", K(index_dml_infos.count()), K(i), K(is_unique), KPC(index_dml_info));
}
}
return ret;
@ -485,6 +492,7 @@ int ObDelUpdLogPlan::compute_exchange_info_for_pdml_del_upd(const ObShardingInfo
const ObTablePartitionInfo &target_table_partition,
const IndexDMLInfo &index_dml_info,
bool is_index_maintenance,
bool need_duplicate_date,
ObExchangeInfo &exch_info)
{
int ret = OB_SUCCESS;
@ -515,32 +523,25 @@ int ObDelUpdLogPlan::compute_exchange_info_for_pdml_del_upd(const ObShardingInfo
exch_info.repartition_table_id_ = index_dml_info.loc_table_id_;
exch_info.repartition_table_name_ = index_dml_info.index_name_;
}
bool can_use_random = !get_stmt()->is_merge_stmt() && !need_duplicate_date &&
get_optimizer_context().is_pdml_heap_table() && !is_index_maintenance;
if (share::schema::PARTITION_LEVEL_ONE == part_level) {
exch_info.repartition_type_ = OB_REPARTITION_ONE_SIDE_ONE_LEVEL;
if (!get_stmt()->is_merge_stmt() &&
get_optimizer_context().is_pdml_heap_table() && !is_index_maintenance) {
exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_RANDOM;
} else {
exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
}
exch_info.dist_method_ = can_use_random
? ObPQDistributeMethod::PARTITION_RANDOM
: ObPQDistributeMethod::PARTITION_HASH;
LOG_TRACE("partition level is one, use pkey reshuffle method");
} else if (share::schema::PARTITION_LEVEL_TWO == part_level) {
exch_info.repartition_type_ = OB_REPARTITION_ONE_SIDE_TWO_LEVEL;
if (!get_stmt()->is_merge_stmt() &&
get_optimizer_context().is_pdml_heap_table() && !is_index_maintenance) {
exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_RANDOM;
} else {
exch_info.dist_method_ = ObPQDistributeMethod::PARTITION_HASH;
}
exch_info.dist_method_ = can_use_random
? ObPQDistributeMethod::PARTITION_RANDOM
: ObPQDistributeMethod::PARTITION_HASH;
LOG_TRACE("partition level is two, use pkey reshuffle method");
} else if (share::schema::PARTITION_LEVEL_ZERO == part_level) {
exch_info.repartition_type_ = OB_REPARTITION_NO_REPARTITION;
if (!get_stmt()->is_merge_stmt() &&
get_optimizer_context().is_pdml_heap_table() && !is_index_maintenance) {
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
} else {
exch_info.dist_method_ = ObPQDistributeMethod::HASH;
}
exch_info.dist_method_ = can_use_random
? ObPQDistributeMethod::RANDOM
: ObPQDistributeMethod::HASH;
LOG_TRACE("partition level is zero, use reduce reshuffle method");
} else {
ret = OB_ERR_UNEXPECTED;
@ -976,6 +977,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_delete(bool is_index_maintain,
IndexDMLInfo *index_dml_info)
{
int ret = OB_SUCCESS;
bool need_duplicate_date = false;
ObExchangeInfo exch_info;
ObSEArray<ObRawExpr*, 1> dummy_filters;
ObShardingInfo *source_sharding = NULL;
@ -987,7 +989,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_delete(bool is_index_maintain,
LOG_WARN("get unexpected error", K(get_stmt()), K(index_dml_info), K(ret));
} else if (OB_FAIL(tmp.push_back(index_dml_info))) {
LOG_WARN("failed to push back index dml info", K(ret));
} else if (OB_FAIL(check_table_rowkey_distinct(tmp))) {
} else if (OB_FAIL(!is_index_maintain && check_table_rowkey_distinct(tmp, need_duplicate_date))) {
LOG_WARN("failed to check table rowkey distinct", K(ret));
} else if (OB_FAIL(calculate_table_location_and_sharding(
*get_stmt(),
@ -1008,6 +1010,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_delete(bool is_index_maintain,
*source_table_partition,
*index_dml_info,
is_index_maintain,
need_duplicate_date,
exch_info))) {
LOG_WARN("failed to compute pdml exchange info for delete/update operator", K(ret));
} else {
@ -1483,14 +1486,20 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_update(bool is_index_maintenance,
IndexDMLInfo *index_dml_info)
{
int ret = OB_SUCCESS;
bool need_duplicate_date = false;
ObExchangeInfo exch_info;
ObSEArray<ObRawExpr*, 1> dummy_filters;
ObShardingInfo *source_sharding = NULL;
ObTablePartitionInfo *source_table_partition = NULL;
ObSEArray<CandidatePlan, 8> best_plans;
ObSEArray<IndexDMLInfo *, 1> tmp;
if (OB_ISNULL(get_stmt()) || OB_ISNULL(index_dml_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(tmp.push_back(index_dml_info))) {
LOG_WARN("failed to push back index dml info", K(ret));
} else if (OB_FAIL(!is_index_maintenance && check_table_rowkey_distinct(tmp, need_duplicate_date))) {
LOG_WARN("failed to check table rowkey distinct", K(ret));
} else if (OB_FAIL(calculate_table_location_and_sharding(
*get_stmt(),
is_index_maintenance ? dummy_filters : get_stmt()->get_condition_exprs(),
@ -1510,6 +1519,7 @@ int ObDelUpdLogPlan::candi_allocate_one_pdml_update(bool is_index_maintenance,
*source_table_partition,
*index_dml_info,
is_index_maintenance,
need_duplicate_date,
exch_info))) {
LOG_WARN("failed to compute pdml exchange info for delete/update operator", K(ret));
} else {

View File

@ -34,7 +34,8 @@ public:
inline virtual const ObDelUpdStmt *get_stmt() const override
{ return static_cast<const ObDelUpdStmt *>(stmt_); }
int check_table_rowkey_distinct(const ObIArray<IndexDMLInfo *> &index_dml_infos);
int check_table_rowkey_distinct(const ObIArray<IndexDMLInfo *> &index_dml_infos,
bool &need_duplicate_date);
int check_fullfill_safe_update_mode(ObLogicalOperator *op);
@ -61,6 +62,7 @@ public:
const ObTablePartitionInfo &target_table_partition,
const IndexDMLInfo &index_dml_info,
bool is_index_maintenance,
bool need_duplicate_date,
ObExchangeInfo &exch_info);
int compute_hash_dist_exprs_for_pdml_del_upd(ObExchangeInfo &exch_info,

View File

@ -180,12 +180,13 @@ int ObDeleteLogPlan::candi_allocate_delete()
int ret = OB_SUCCESS;
ObSEArray<CandidatePlan, 8> candi_plans;
ObSEArray<CandidatePlan, 8> delete_plans;
bool need_duplicate_date = false;
const bool force_no_multi_part = get_log_plan_hint().no_use_distributed_dml();
const bool force_multi_part = get_log_plan_hint().use_distributed_dml();
OPT_TRACE("start generate normal insert plan");
OPT_TRACE("force no multi part:", force_no_multi_part);
OPT_TRACE("force multi part:", force_multi_part);
if (OB_FAIL(check_table_rowkey_distinct(index_dml_infos_))) {
if (OB_FAIL(check_table_rowkey_distinct(index_dml_infos_, need_duplicate_date))) {
LOG_WARN("failed to check table rowkey distinct", K(ret));
} else if (OB_FAIL(get_minimal_cost_candidates(candidates_.candidate_plans_, candi_plans))) {
LOG_WARN("failed to get minimal cost candidates", K(ret));

View File

@ -279,6 +279,7 @@ int ObMergeLogPlan::candi_allocate_pdml_merge()
*target_table_partition,
*index_dml_info,
false,
true, /* need_duplicate_date */
exch_info))) {
LOG_WARN("fail to compute exchange info for pdml merge", K(ret));
} else {

View File

@ -472,10 +472,6 @@ int ObOptimizer::check_pdml_enabled(const ObDMLStmt &stmt,
if (is_pk_auto_inc) {
ctx_.add_plan_note(PDML_DISABLED_BY_INSERT_PK_AUTO_INC);
}
} else if ((stmt.is_update_stmt() || stmt.is_delete_stmt())
&& static_cast<const ObDelUpdStmt &>(stmt).dml_source_from_join()
&& static_cast<const ObDelUpdStmt &>(stmt).is_dml_table_from_join()) {
can_use_pdml = false;
} else if (OB_FAIL(check_pdml_supported_feature(static_cast<const ObDelUpdStmt&>(stmt),
session, can_use_pdml))) {
LOG_WARN("failed to check pdml supported feature", K(ret));

View File

@ -263,6 +263,7 @@ int ObUpdateLogPlan::generate_normal_raw_plan()
int ObUpdateLogPlan::candi_allocate_update()
{
int ret = OB_SUCCESS;
bool need_duplicate_date = false;
ObConstRawExpr *lock_row_flag_expr = NULL;
ObSEArray<CandidatePlan, 8> candi_plans;
ObSEArray<CandidatePlan, 8> update_plans;
@ -271,7 +272,7 @@ int ObUpdateLogPlan::candi_allocate_update()
OPT_TRACE("start generate normal update plan");
OPT_TRACE("force no multi part:", force_no_multi_part);
OPT_TRACE("force multi part:", force_multi_part);
if (OB_FAIL(check_table_rowkey_distinct(index_dml_infos_))) {
if (OB_FAIL(check_table_rowkey_distinct(index_dml_infos_, need_duplicate_date))) {
LOG_WARN("failed to check table rowkey distinct", K(ret));
} else if (OB_FAIL(get_minimal_cost_candidates(candidates_.candidate_plans_,
candi_plans))) {