[CP] adjust allocate subplan filter

This commit is contained in:
chimyue 2024-11-28 04:15:24 +00:00 committed by ob-robot
parent 79eba2e1e9
commit 73809832cc
2 changed files with 227 additions and 74 deletions

View File

@ -8022,7 +8022,7 @@ int ObLogPlan::inner_candi_allocate_subplan_filter(ObIArray<ObLogPlan*> &subplan
is_update_set,
dist_methods,
subquery_plans))) {
LOG_WARN("failed to allocate subplan filter", K(ret));
LOG_WARN("failed to allocate subplan filter", K(ret), K(subquery_plans.count()));
} else if (!subquery_plans.empty()) {
LOG_TRACE("succeed to allocate subplan filter using hint", K(subquery_plans.count()), K(dist_methods));
OPT_TRACE("success to generate subplan filter plan with hint");
@ -8046,7 +8046,7 @@ int ObLogPlan::inner_candi_allocate_subplan_filter(ObIArray<ObLogPlan*> &subplan
is_update_set,
dist_methods,
subquery_plans))) {
LOG_WARN("failed to allocate subplan filter", K(ret));
LOG_WARN("failed to allocate subplan filter", K(ret), K(subquery_plans.count()));
} else {
LOG_TRACE("succeed to allocate subplan filter ignore hint", K(subquery_plans.count()), K(dist_methods));
OPT_TRACE("success to generate subplan filter plan ignore hint");
@ -8201,78 +8201,86 @@ int ObLogPlan::inner_candi_allocate_subplan_filter(ObIArray<ObSEArray<CandidateP
ObIArray<CandidatePlan> &subquery_plans)
{
int ret = OB_SUCCESS;
CandidatePlan candidate_plan;
ObSEArray<int64_t, 4> move_pos;
ObSEArray<ObLogicalOperator*, 4> child_ops;
ObSEArray<ObLogicalOperator*, 4> dist_child_ops;
// generate subplan filter
for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
candidate_plan = candidates_.candidate_plans_.at(i);
OPT_TRACE("generate subplan filter for plan:", candidate_plan);
if (OB_ISNULL(candidate_plan.plan_tree_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(candidate_plan.plan_tree_), K(ret));
} else {
bool has_next = true;
move_pos.reuse();
for (int64_t j = 0; OB_SUCC(ret) && j < best_list.count(); j++) {
ret = move_pos.push_back(0);
}
// get child ops to generate plan
while (OB_SUCC(ret) && has_next) {
child_ops.reuse();
dist_child_ops.reuse();
// get child ops to generate plan
for (int64_t j = 0; OB_SUCC(ret) && j < move_pos.count(); j++) {
int64_t size = best_list.at(j).count();
if (OB_UNLIKELY(move_pos.at(j) < 0 || move_pos.at(j) >= size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected array count", K(size), K(move_pos.at(i)), K(ret));
} else if (OB_FAIL(child_ops.push_back(best_list.at(j).at(move_pos.at(j)).plan_tree_))) {
LOG_WARN("failed to push back child ops", K(ret));
} else if (OB_FAIL(dist_child_ops.push_back(dist_best_list.at(j).at(move_pos.at(j)).plan_tree_))) {
LOG_WARN("failed to push back child ops", K(ret));
} else { /*do nothing*/ }
}
// create subplan filter plan
if (OB_SUCC(ret)) {
CandidatePlan curr_candidate_plan;
curr_candidate_plan.plan_tree_ = candidate_plan.plan_tree_;
int64_t cur_dist_methods = dist_methods;
if (OB_FAIL(get_subplan_filter_distributed_method(curr_candidate_plan.plan_tree_,
child_ops,
if (query_refs.count() > 3) {
if (OB_FAIL(inner_candi_allocate_massive_subplan_filter(best_list,
dist_best_list,
query_refs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
filters,
for_cursor_expr,
!onetime_idxs.is_empty(),
cur_dist_methods))) {
LOG_WARN("failed to get subplan filter distributed method", K(ret));
} else if (0 == cur_dist_methods) {
/* do nothing */
} else if (OB_FAIL(create_subplan_filter_plan(curr_candidate_plan.plan_tree_,
child_ops,
dist_child_ops,
query_refs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
cur_dist_methods,
filters,
is_update_set))) {
LOG_WARN("failed to create subplan filter plan", K(ret));
} else if (OB_FAIL(subquery_plans.push_back(curr_candidate_plan))) {
LOG_WARN("failed to push back subquery plans", K(ret));
} else { /*do nothing*/ }
is_update_set,
dist_methods,
subquery_plans))) {
LOG_WARN("failed to candi allocate massive subplan filter", K(ret));
}
} else {
CandidatePlan candidate_plan;
ObSEArray<int64_t, 4> move_pos;
ObSEArray<ObLogicalOperator*, 4> child_ops;
ObSEArray<ObLogicalOperator*, 4> dist_child_ops;
// generate subplan filter
for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
candidate_plan = candidates_.candidate_plans_.at(i);
OPT_TRACE("generate subplan filter for plan:", candidate_plan);
if (OB_ISNULL(candidate_plan.plan_tree_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(candidate_plan.plan_tree_), K(ret));
} else {
bool has_next = true;
move_pos.reuse();
for (int64_t j = 0; OB_SUCC(ret) && j < best_list.count(); j++) {
ret = move_pos.push_back(0);
}
// reset pos for next generation
if (OB_SUCC(ret)) {
has_next = false;
for (int64_t j = move_pos.count() - 1; !has_next && OB_SUCC(ret) && j >= 0; j--) {
if (move_pos.at(j) < best_list.at(j).count() - 1) {
++move_pos.at(j);
has_next = true;
for (int64_t k = j + 1; k < move_pos.count(); k++) {
move_pos.at(k) = 0;
// get child ops to generate plan
while (OB_SUCC(ret) && has_next) {
child_ops.reuse();
dist_child_ops.reuse();
// get child ops to generate plan
for (int64_t j = 0; OB_SUCC(ret) && j < move_pos.count(); j++) {
int64_t size = best_list.at(j).count();
if (OB_UNLIKELY(move_pos.at(j) < 0 || move_pos.at(j) >= size)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected array count", K(size), K(move_pos.at(i)), K(ret));
} else if (OB_FAIL(child_ops.push_back(best_list.at(j).at(move_pos.at(j)).plan_tree_))) {
LOG_WARN("failed to push back child ops", K(ret));
} else if (OB_FAIL(dist_child_ops.push_back(dist_best_list.at(j).at(move_pos.at(j)).plan_tree_))) {
LOG_WARN("failed to push back child ops", K(ret));
} else { /*do nothing*/ }
}
// create subplan filter plan
if (OB_SUCC(ret)) {
CandidatePlan curr_candidate_plan(candidate_plan.plan_tree_);
if (OB_FAIL(create_subplan_filter_plan(curr_candidate_plan.plan_tree_,
child_ops,
dist_child_ops,
query_refs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
dist_methods,
filters,
is_update_set,
for_cursor_expr))) {
LOG_WARN("failed to create subplan filter plan", K(ret));
} else if (NULL != curr_candidate_plan.plan_tree_
&& OB_FAIL(subquery_plans.push_back(curr_candidate_plan))) {
LOG_WARN("failed to push back subquery plans", K(ret));
} else { /*do nothing*/ }
}
// reset pos for next generation
if (OB_SUCC(ret)) {
has_next = false;
for (int64_t j = move_pos.count() - 1; !has_next && OB_SUCC(ret) && j >= 0; j--) {
if (move_pos.at(j) < best_list.at(j).count() - 1) {
++move_pos.at(j);
has_next = true;
for (int64_t k = j + 1; k < move_pos.count(); k++) {
move_pos.at(k) = 0;
}
}
}
}
@ -8283,6 +8291,126 @@ int ObLogPlan::inner_candi_allocate_subplan_filter(ObIArray<ObSEArray<CandidateP
return ret;
}
int ObLogPlan::inner_candi_allocate_massive_subplan_filter(ObIArray<ObSEArray<CandidatePlan,4>> &best_list,
ObIArray<ObSEArray<CandidatePlan,4>> &dist_best_list,
ObIArray<ObQueryRefRawExpr *> &query_refs,
ObIArray<ObExecParamRawExpr *> &params,
ObIArray<ObExecParamRawExpr *> &onetime_exprs,
ObBitSet<> &initplan_idxs,
ObBitSet<> &onetime_idxs,
const ObIArray<ObRawExpr *> &filters,
const bool for_cursor_expr,
const bool is_update_set,
const int64_t dist_methods,
ObIArray<CandidatePlan> &subquery_plans)
{
int ret = OB_SUCCESS;
ObSEArray<ObSEArray<ObLogicalOperator*, 4>, 3> child_ops;
ObSEArray<ObSEArray<ObLogicalOperator*, 4>, 3> dist_child_ops;
if (OB_UNLIKELY(best_list.count() != dist_best_list.count() || best_list.count() != query_refs.count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected array count", K(best_list.count()), K(dist_best_list.count()), K(query_refs.count()), K(ret));
} else if (OB_FAIL(child_ops.prepare_allocate(3))
|| OB_FAIL(dist_child_ops.prepare_allocate(3))) {
LOG_WARN("fail to prepare allocate", K(ret));
} else {
bool exist_das_plan = true;
bool exist_px_plan = true;
ObLogicalOperator *cur_child = NULL;
int64_t das_best_pos = OB_INVALID_INDEX;
int64_t px_best_pos = OB_INVALID_INDEX;
int64_t best_pos = OB_INVALID_INDEX;
for (int64_t i = 0; OB_SUCC(ret) && i < best_list.count(); ++i) {
das_best_pos = OB_INVALID_INDEX;
px_best_pos = OB_INVALID_INDEX;
best_pos = OB_INVALID_INDEX;
if (OB_UNLIKELY(best_list.at(i).count() != dist_best_list.at(i).count())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected array count", K(i), K(best_list.at(i).count()), K(dist_best_list.at(i).count()), K(ret));
}
for (int64_t j = 0; OB_SUCC(ret) && j < best_list.at(i).count(); ++j) {
if (OB_ISNULL(cur_child = best_list.at(i).at(j).plan_tree_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(i), K(j), K(ret), K(best_list));
} else {
if (cur_child->is_match_all()
&& (OB_INVALID_INDEX == das_best_pos
|| best_list.at(i).at(das_best_pos).plan_tree_->get_cost() > cur_child->get_cost())) {
das_best_pos = j;
}
if ((!cur_child->is_match_all() || !cur_child->get_contains_das_op())
&& (OB_INVALID_INDEX == px_best_pos
|| best_list.at(i).at(px_best_pos).plan_tree_->get_cost() > cur_child->get_cost())) {
px_best_pos = j;
}
if (OB_INVALID_INDEX == best_pos
|| best_list.at(i).at(best_pos).plan_tree_->get_cost() > cur_child->get_cost()) {
best_pos = j;
}
}
}
if (OB_FAIL(ret) || !exist_das_plan) {
} else if (OB_INVALID_INDEX == das_best_pos) {
exist_das_plan = false;
child_ops.at(0).reuse();
dist_child_ops.at(0).reuse();
} else if (OB_FAIL(child_ops.at(0).push_back(best_list.at(i).at(das_best_pos).plan_tree_))
|| OB_FAIL(dist_child_ops.at(0).push_back(dist_best_list.at(i).at(das_best_pos).plan_tree_))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_FAIL(ret) || !exist_px_plan) {
} else if (OB_INVALID_INDEX == px_best_pos) {
exist_px_plan = false;
child_ops.at(1).reuse();
dist_child_ops.at(1).reuse();
} else if (OB_FAIL(child_ops.at(1).push_back(best_list.at(i).at(px_best_pos).plan_tree_))
|| OB_FAIL(dist_child_ops.at(1).push_back(dist_best_list.at(i).at(px_best_pos).plan_tree_))) {
LOG_WARN("failed to push back", K(ret));
}
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(OB_INVALID_INDEX == best_pos)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected best pos", K(ret), K(best_pos));
} else if (OB_FAIL(child_ops.at(2).push_back(best_list.at(i).at(best_pos).plan_tree_))
|| OB_FAIL(dist_child_ops.at(2).push_back(dist_best_list.at(i).at(best_pos).plan_tree_))) {
LOG_WARN("failed to push back", K(ret));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); ++i) {
OPT_TRACE("generate subplan filter for plan:", candidates_.candidate_plans_.at(i));
for (int64_t j = 0; OB_SUCC(ret) && j <= 2; ++j) {
CandidatePlan curr_candidate_plan(candidates_.candidate_plans_.at(i).plan_tree_);
if ((0 == j && !exist_das_plan)
|| (1 == j && !exist_px_plan)
|| (2 == j && (exist_das_plan || exist_px_plan))) {
/* do nothing */
} else if (OB_FAIL(create_subplan_filter_plan(curr_candidate_plan.plan_tree_,
child_ops.at(j),
dist_child_ops.at(j),
query_refs,
params,
onetime_exprs,
initplan_idxs,
onetime_idxs,
dist_methods,
filters,
is_update_set,
for_cursor_expr))) {
LOG_WARN("failed to create subplan filter plan", K(ret));
} else if (NULL != curr_candidate_plan.plan_tree_
&& OB_FAIL(subquery_plans.push_back(curr_candidate_plan))) {
LOG_WARN("failed to push back subquery plans", K(ret));
} else { /*do nothing*/ }
}
}
}
return ret;
}
/*
* @param for_on_condition
* The subquery on the on condition will try to generate the subplan filter multiple times
@ -8506,14 +8634,25 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
const ObBitSet<> &onetime_idxs,
const int64_t dist_methods,
const ObIArray<ObRawExpr*> &filters,
const bool is_update_set)
const bool is_update_set,
const bool for_cursor_expr)
{
int ret = OB_SUCCESS;
ObExchangeInfo exch_info;
const DistAlgo dist_algo = get_dist_algo(dist_methods);
int64_t cur_dist_methods = dist_methods;
DistAlgo dist_algo = DIST_INVALID_METHOD;
if (OB_ISNULL(top)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
} else if (OB_FAIL(get_subplan_filter_distributed_method(top,
subquery_ops,
params,
for_cursor_expr,
!onetime_idxs.is_empty(),
cur_dist_methods))) {
LOG_WARN("failed to get subplan filter distributed method", K(ret));
} else if (DIST_INVALID_METHOD == (dist_algo = get_dist_algo(cur_dist_methods))) {
top = NULL;
} else if (DistAlgo::DIST_BASIC_METHOD == dist_algo ||
DistAlgo::DIST_PARTITION_WISE == dist_algo ||
DistAlgo::DIST_NONE_ALL == dist_algo) {

View File

@ -1092,6 +1092,19 @@ public:
const int64_t dist_methods,
ObIArray<CandidatePlan> &subquery_plans);
int inner_candi_allocate_massive_subplan_filter(ObIArray<ObSEArray<CandidatePlan,4>> &best_list,
ObIArray<ObSEArray<CandidatePlan,4>> &dist_best_list,
ObIArray<ObQueryRefRawExpr *> &query_refs,
ObIArray<ObExecParamRawExpr *> &params,
ObIArray<ObExecParamRawExpr *> &onetime_exprs,
ObBitSet<> &initplan_idxs,
ObBitSet<> &onetime_idxs,
const ObIArray<ObRawExpr *> &filters,
const bool for_cursor_expr,
const bool is_update_set,
const int64_t dist_methods,
ObIArray<CandidatePlan> &subquery_plans);
int prepare_subplan_candidate_list(ObIArray<ObLogPlan*> &subplans,
ObIArray<ObExecParamRawExpr *> &params,
ObIArray<ObSEArray<CandidatePlan, 4>> &best_list,
@ -1128,7 +1141,8 @@ public:
const ObBitSet<> &onetime_idxs,
const int64_t dist_methods,
const ObIArray<ObRawExpr*> &filters,
const bool is_update_set);
const bool is_update_set,
const bool for_cursor_expr);
int check_contains_recursive_cte(ObIArray<ObLogicalOperator*> &child_ops,
bool &is_recursive_cte);