fix set partition wise bug
This commit is contained in:
@ -2074,65 +2074,82 @@ int ObSelectLogPlan::create_union_all_plan(const ObIArray<ObLogicalOperator*> &c
|
||||
} else {
|
||||
random_none_idx = OB_INVALID_INDEX;
|
||||
}
|
||||
OPT_TRACE("start create unoin all plan");
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_BASIC_METHOD)) {
|
||||
bool is_basic = false;
|
||||
OPT_TRACE("check match basic method");
|
||||
if (OB_FAIL(ObOptimizerUtil::check_basic_sharding_info(get_optimizer_context().get_local_server_addr(),
|
||||
child_ops,
|
||||
is_basic))) {
|
||||
LOG_WARN("failed to check basic sharding info", K(ret));
|
||||
} else if (!is_basic) {
|
||||
set_dist_methods &= ~DIST_BASIC_METHOD;
|
||||
OPT_TRACE("will not use basic method");
|
||||
} else if (OB_FAIL(set_child_ops.assign(child_ops))) {
|
||||
LOG_WARN("failed to assign child ops", K(ret));
|
||||
} else {
|
||||
set_dist_methods = DistAlgo::DIST_BASIC_METHOD;
|
||||
OPT_TRACE("will use basic method, ignore other method");
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_PARTITION_WISE)) {
|
||||
OPT_TRACE("check match partition wise");
|
||||
if (OB_FAIL(check_if_union_all_match_partition_wise(child_ops, is_partition_wise))) {
|
||||
LOG_WARN("failed to check if union all match partition wise", K(ret));
|
||||
} else if (!is_partition_wise) {
|
||||
set_dist_methods &= ~DIST_PARTITION_WISE;
|
||||
OPT_TRACE("will not use partition wise");
|
||||
} else if (OB_FAIL(set_child_ops.assign(child_ops))) {
|
||||
LOG_WARN("failed to assign child ops", K(ret));
|
||||
} else {
|
||||
set_dist_methods = DistAlgo::DIST_PARTITION_WISE;
|
||||
OPT_TRACE("will use partition wise, ignore other method");
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_EXT_PARTITION_WISE)) {
|
||||
OPT_TRACE("check match extend partition wise");
|
||||
if (OB_FAIL(check_if_union_all_match_extended_partition_wise(child_ops, is_ext_partition_wise))) {
|
||||
LOG_WARN("failed to check if union all match extended partition wise", K(ret));
|
||||
} else if (!is_ext_partition_wise) {
|
||||
set_dist_methods &= ~DIST_EXT_PARTITION_WISE;
|
||||
OPT_TRACE("will not use extend partition wise");
|
||||
} else if (OB_FAIL(set_child_ops.assign(child_ops))) {
|
||||
LOG_WARN("failed to assign child ops", K(ret));
|
||||
} else {
|
||||
set_dist_methods = DistAlgo::DIST_EXT_PARTITION_WISE;
|
||||
OPT_TRACE("will use extend partition wise, ignore other method");
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_SET_PARTITION_WISE)) {
|
||||
OPT_TRACE("check match set partition wise");
|
||||
if (OB_FAIL(check_if_union_all_match_set_partition_wise(child_ops, is_set_partition_wise))) {
|
||||
LOG_WARN("failed to check if union all match set partition wise", K(ret));
|
||||
} else if (!is_set_partition_wise) {
|
||||
set_dist_methods &= ~DIST_SET_PARTITION_WISE;
|
||||
OPT_TRACE("will not use set partition wise");
|
||||
} else if (OB_FAIL(set_child_ops.assign(child_ops))) {
|
||||
LOG_WARN("failed to assign child ops", K(ret));
|
||||
} else {
|
||||
set_dist_methods = DistAlgo::DIST_SET_PARTITION_WISE;
|
||||
OPT_TRACE("will use set partition wise, ignore other method");
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_SET_RANDOM)) {
|
||||
OPT_TRACE("check match set random method");
|
||||
// has distributed child, use random distribution
|
||||
int64_t largest_pos = -1;
|
||||
if (OB_FAIL(get_largest_sharding_child(child_ops, random_none_idx, largest_pos))) {
|
||||
LOG_WARN("failed to get largest sharding children", K(ret));
|
||||
} else if (-1 == largest_pos) {
|
||||
set_dist_methods &= ~DistAlgo::DIST_SET_RANDOM;
|
||||
OPT_TRACE("all children`s sharding is distribute, no need shuffle");
|
||||
OPT_TRACE("will not use set random");
|
||||
} else {
|
||||
OPT_TRACE("will use set random, ignore other method");
|
||||
ObExchangeInfo exch_info;
|
||||
exch_info.dist_method_ = ObPQDistributeMethod::RANDOM;
|
||||
set_dist_methods = DistAlgo::DIST_SET_RANDOM;
|
||||
@ -2153,6 +2170,7 @@ int ObSelectLogPlan::create_union_all_plan(const ObIArray<ObLogicalOperator*> &c
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && (set_dist_methods & DistAlgo::DIST_PULL_TO_LOCAL)) {
|
||||
OPT_TRACE("will use pull to local method");
|
||||
ObExchangeInfo exch_info;
|
||||
set_dist_methods = DistAlgo::DIST_PULL_TO_LOCAL;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < child_ops.count(); i++) {
|
||||
@ -2245,6 +2263,7 @@ int ObSelectLogPlan::check_if_union_all_match_set_partition_wise(const ObIArray<
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSEArray<ObAddr, 4> first_server_list;
|
||||
bool is_inherit_from_access_all = false;
|
||||
is_union_all_set_pw = true;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && is_union_all_set_pw && i < child_ops.count(); i++) {
|
||||
ObLogicalOperator *child_op = NULL;
|
||||
@ -2255,17 +2274,33 @@ int ObSelectLogPlan::check_if_union_all_match_set_partition_wise(const ObIArray<
|
||||
LOG_WARN("invalid input", K(ret), K(child_op), K(child_sharding));
|
||||
} else if (!child_sharding->is_distributed()) {
|
||||
is_union_all_set_pw = false;
|
||||
OPT_TRACE("not distributed sharding, can not use set partition wise");
|
||||
} else if (i == 0) {
|
||||
if (OB_FAIL(first_server_list.assign(child_op->get_server_list()))) {
|
||||
if (OB_FAIL(check_sharding_inherit_from_access_all(child_op,
|
||||
is_inherit_from_access_all))) {
|
||||
LOG_WARN("failed to check sharding inherit from access all", K(ret));
|
||||
} else if (is_inherit_from_access_all) {
|
||||
//partition wise flag conflict with access all flag
|
||||
is_union_all_set_pw = false;
|
||||
} else if (OB_FAIL(first_server_list.assign(child_op->get_server_list()))) {
|
||||
LOG_WARN("failed to get first server list", K(ret));
|
||||
} else {
|
||||
LOG_TRACE("succ to check union all matching set pw", K(first_server_list));
|
||||
}
|
||||
} else if (OB_FAIL(check_sharding_inherit_from_access_all(child_op,
|
||||
is_inherit_from_access_all))) {
|
||||
LOG_WARN("failed to check sharding inherit from access all", K(ret));
|
||||
} else if (is_inherit_from_access_all) {
|
||||
//partition wise flag conflict with access all flag
|
||||
is_union_all_set_pw = false;
|
||||
} else if (OB_FAIL(ObShardingInfo::is_physically_equal_serverlist(first_server_list,
|
||||
child_op->get_server_list(),
|
||||
is_union_all_set_pw))) {
|
||||
LOG_WARN("failed to check if equal server list", K(ret));
|
||||
} else {
|
||||
if (!is_union_all_set_pw) {
|
||||
OPT_TRACE("server list not equal, can not use set partition wise");
|
||||
}
|
||||
LOG_TRACE("succ to check union all matching set pw",
|
||||
K(first_server_list), K(child_op->get_server_list()), K(is_union_all_set_pw));
|
||||
}
|
||||
@ -2273,6 +2308,36 @@ int ObSelectLogPlan::check_if_union_all_match_set_partition_wise(const ObIArray<
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSelectLogPlan::check_sharding_inherit_from_access_all(ObLogicalOperator* op,
|
||||
bool &is_inherit_from_access_all)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
is_inherit_from_access_all = false;
|
||||
if (OB_ISNULL(op)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpect null op", K(ret));
|
||||
} else if (log_op_def::LOG_JOIN == op->get_type()) {
|
||||
ObLogJoin *log_join = static_cast<ObLogJoin*>(op);
|
||||
if (DIST_BC2HOST_NONE == log_join->get_dist_method() &&
|
||||
log_join->is_nlj_with_param_down()) {
|
||||
is_inherit_from_access_all = true;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) &&
|
||||
!is_inherit_from_access_all &&
|
||||
op->get_inherit_sharding_index() != -1) {
|
||||
int64_t idx = op->get_inherit_sharding_index();
|
||||
if (idx < 0 || idx >= op->get_num_of_child()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("unexpect inherit sharding index", K(ret));
|
||||
} else if (OB_FAIL(SMART_CALL(check_sharding_inherit_from_access_all(op->get_child(idx),
|
||||
is_inherit_from_access_all)))) {
|
||||
LOG_WARN("failed to check sharding inherit from bc2host", K(ret));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSelectLogPlan::check_if_union_all_match_extended_partition_wise(const ObIArray<ObLogicalOperator*> &child_ops,
|
||||
bool &is_union_all_ext_pw)
|
||||
{
|
||||
@ -2288,6 +2353,7 @@ int ObSelectLogPlan::check_if_union_all_match_extended_partition_wise(const ObIA
|
||||
LOG_WARN("invalid input", K(ret), K(child_op), K(child_sharding));
|
||||
} else if (!child_sharding->is_distributed()) {
|
||||
is_union_all_ext_pw = false;
|
||||
OPT_TRACE("not distribute sharding, can not use extend partition wise");
|
||||
} else if (i == 0) {
|
||||
if (OB_FAIL(first_server_list.assign(child_op->get_server_list()))) {
|
||||
LOG_WARN("failed to get first server list", K(ret));
|
||||
@ -2299,6 +2365,9 @@ int ObSelectLogPlan::check_if_union_all_match_extended_partition_wise(const ObIA
|
||||
is_union_all_ext_pw))) {
|
||||
LOG_WARN("failed to check if both are shuffled server list", K(ret));
|
||||
} else {
|
||||
if (!is_union_all_ext_pw) {
|
||||
OPT_TRACE("server list not match, can not use extend partition wise");
|
||||
}
|
||||
LOG_TRACE("succ to check union all matching ext pw",
|
||||
K(first_server_list), K(child_op->get_server_list()), K(is_union_all_ext_pw));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user