修复partition wise计划并行度计算不对,导致计划走偏的问题

This commit is contained in:
zzg19950727 2024-12-10 04:15:12 +00:00 committed by ob-robot
parent a2dd132cc5
commit 13bc8850d3
27 changed files with 1347 additions and 606 deletions

View File

@ -94,7 +94,7 @@ int ObSqlPlan::store_sql_plan(ObLogPlan* log_plan, ObPhysicalPlan* phy_plan)
} else if (OB_FAIL(init_buffer(plan_text))) {
LOG_WARN("failed to init buffer", K(ret));
} else if (OB_FAIL(get_sql_plan_infos(plan_text,
log_plan,
log_plan->get_plan_root(),
sql_plan_infos))) {
LOG_WARN("failed to get sql plan infos", K(ret));
} else if (OB_FAIL(compress_plan.compress_logical_plan(allocator_, sql_plan_infos))) {
@ -127,7 +127,7 @@ int ObSqlPlan::store_sql_plan_for_explain(ObExecContext *ctx,
if (OB_FAIL(init_buffer(plan_text))) {
LOG_WARN("failed to init buffer", K(ret));
} else if (OB_FAIL(get_sql_plan_infos(plan_text,
plan,
plan->get_plan_root(),
sql_plan_infos))) {
LOG_WARN("failed to get sql plan infos", K(ret));
}
@ -164,7 +164,7 @@ int ObSqlPlan::store_sql_plan_for_explain(ObExecContext *ctx,
return ret;
}
int ObSqlPlan::print_sql_plan(ObLogPlan* plan,
int ObSqlPlan::print_sql_plan(ObLogicalOperator* plan_top,
ExplainType type,
const ObExplainDisplayOpt& option,
ObIArray<common::ObString> &plan_strs)
@ -177,7 +177,7 @@ int ObSqlPlan::print_sql_plan(ObLogPlan* plan,
if (OB_FAIL(init_buffer(plan_text))) {
LOG_WARN("failed to init buffer", K(ret));
} else if (OB_FAIL(get_sql_plan_infos(plan_text,
plan,
plan_top,
sql_plan_infos))) {
LOG_WARN("failed to get sql plan infos", K(ret));
} else if (OB_FAIL(format_sql_plan(sql_plan_infos,
@ -614,16 +614,16 @@ int ObSqlPlan::inner_escape_quotes(char* &ptr, int64_t &length)
}
int ObSqlPlan::get_sql_plan_infos(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObIArray<ObSqlPlanItem*> &sql_plan_infos)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(plan)) {
if (OB_ISNULL(plan_top)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null plan", K(ret));
//get operator tree info
} else if (OB_FAIL(get_plan_tree_infos(plan_text,
plan->get_plan_root(),
plan_top,
sql_plan_infos,
0,
1,
@ -634,19 +634,19 @@ int ObSqlPlan::get_sql_plan_infos(PlanText &plan_text,
LOG_WARN("unexpect null plan", K(ret));
//get used hint、outline info
} else if (OB_FAIL(get_plan_used_hint_info(plan_text,
plan,
plan_top,
sql_plan_infos.at(0)))) {
LOG_WARN("failed to get plan outline info", K(ret));
} else if (OB_FAIL(get_qb_name_trace(plan_text,
plan,
plan_top->get_plan(),
sql_plan_infos.at(0)))) {
LOG_WARN("failed to get qb name trace", K(ret));
} else if (OB_FAIL(get_plan_outline_info(plan_text,
plan,
plan_top,
sql_plan_infos.at(0)))) {
LOG_WARN("failed to get plan outline info", K(ret));
} else if (OB_FAIL(get_plan_other_info(plan_text,
plan,
plan_top->get_plan(),
sql_plan_infos.at(0)))) {
LOG_WARN("failed to get plan other info", K(ret));
}
@ -694,16 +694,16 @@ int ObSqlPlan::get_plan_tree_infos(PlanText &plan_text,
}
int ObSqlPlan::get_plan_used_hint_info(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObSqlPlanItem* sql_plan_item)
{
int ret = OB_SUCCESS;
const ObQueryCtx *query_ctx = NULL;
//print_plan_tree:print_used_hint
if (OB_ISNULL(plan) || OB_ISNULL(sql_plan_item) ||
OB_ISNULL(query_ctx = plan->get_optimizer_context().get_query_ctx())) {
if (OB_ISNULL(plan_top) || OB_ISNULL(plan_top->get_plan()) || OB_ISNULL(sql_plan_item) ||
OB_ISNULL(query_ctx = plan_top->get_plan()->get_optimizer_context().get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL", K(ret), K(plan), K(query_ctx));
LOG_WARN("unexpected NULL", K(ret), K(plan_top), K(query_ctx));
} else {
const ObQueryHint &query_hint = query_ctx->get_query_hint();
PlanText temp_text;
@ -714,7 +714,7 @@ int ObSqlPlan::get_plan_used_hint_info(PlanText &plan_text,
BUF_PRINT_CONST_STR(" /*+", temp_text);
BUF_PRINT_CONST_STR(NEW_LINE, temp_text);
BUF_PRINT_CONST_STR(OUTPUT_PREFIX, temp_text);
if (OB_FAIL(get_plan_tree_used_hint(temp_text, plan->get_plan_root()))) {
if (OB_FAIL(get_plan_tree_used_hint(temp_text, plan_top))) {
LOG_WARN("failed to get plan tree used hint", K(ret));
} else if (OB_FAIL(query_hint.print_qb_name_hints(temp_text))) {
LOG_WARN("failed to print qb name hints", K(ret));
@ -789,15 +789,15 @@ int ObSqlPlan::get_qb_name_trace(PlanText &plan_text,
}
int ObSqlPlan::get_plan_outline_info(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObSqlPlanItem* sql_plan_item)
{
int ret = OB_SUCCESS;
const ObQueryCtx *query_ctx = NULL;
if (OB_ISNULL(plan) || OB_ISNULL(sql_plan_item) ||
OB_ISNULL(query_ctx = plan->get_optimizer_context().get_query_ctx())) {
if (OB_ISNULL(plan_top) || OB_ISNULL(plan_top->get_plan()) || OB_ISNULL(sql_plan_item) ||
OB_ISNULL(query_ctx = plan_top->get_plan()->get_optimizer_context().get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL", K(ret), K(plan), K(query_ctx));
LOG_WARN("unexpected NULL", K(ret), K(plan_top), K(query_ctx));
} else {
PlanText temp_text;
temp_text.is_used_hint_ = false;
@ -810,13 +810,13 @@ int ObSqlPlan::get_plan_outline_info(PlanText &plan_text,
BUF_PRINT_CONST_STR(OUTPUT_PREFIX, temp_text);
BUF_PRINT_CONST_STR("BEGIN_OUTLINE_DATA", temp_text);
const ObQueryHint &query_hint = query_ctx->get_query_hint();
if (OB_FAIL(reset_plan_tree_outline_flag(plan->get_plan_root()))) {
if (OB_FAIL(reset_plan_tree_outline_flag(plan_top))) {
LOG_WARN("failed to reset plan tree outline flag", K(ret));
} else if (OB_FAIL(get_plan_tree_outline(temp_text, plan->get_plan_root()))) {
} else if (OB_FAIL(get_plan_tree_outline(temp_text, plan_top))) {
LOG_WARN("failed to get plan tree outline", K(ret));
} else if (OB_FAIL(query_hint.print_transform_hints(temp_text))) {
LOG_WARN("failed to print all transform hints", K(ret));
} else if (OB_FAIL(get_global_hint_outline(temp_text, *plan))) {
} else if (OB_FAIL(get_global_hint_outline(temp_text, *plan_top->get_plan()))) {
LOG_WARN("failed to get plan global hint outline", K(ret));
} else {
BUF_PRINT_CONST_STR(NEW_LINE, temp_text);
@ -1114,7 +1114,11 @@ int ObSqlPlan::format_sql_plan(ObIArray<ObSqlPlanItem*> &sql_plan_infos,
} else if (sql_plan_infos.empty()) {
//do nothing
} else {
if (EXPLAIN_BASIC == type) {
if (EXPLAIN_PLAN_TABLE == type) {
if (OB_FAIL(format_plan_table(sql_plan_infos, option, plan_text))) {
LOG_WARN("failed to print plan", K(ret));
}
} else if (EXPLAIN_BASIC == type) {
if (OB_FAIL(format_basic_plan_table(sql_plan_infos, option, plan_text))) {
LOG_WARN("failed to print plan", K(ret));
} else if (OB_FAIL(format_plan_output(sql_plan_infos, plan_text))) {

View File

@ -136,7 +136,7 @@ public:
const ObExplainDisplayOpt& option,
ObIArray<common::ObString> &plan_strs);
int print_sql_plan(ObLogPlan* plan,
int print_sql_plan(ObLogicalOperator* plan_top,
ExplainType type,
const ObExplainDisplayOpt& option,
ObIArray<common::ObString> &plan_strs);
@ -171,7 +171,7 @@ private:
int inner_escape_quotes(char* &ptr, int64_t &length);
int get_sql_plan_infos(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObIArray<ObSqlPlanItem*> &sql_plan_infos);
int get_plan_tree_infos(PlanText &plan_text,
@ -182,7 +182,7 @@ private:
bool is_last_child);
int get_plan_used_hint_info(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObSqlPlanItem* sql_plan_item);
static int get_plan_tree_used_hint(PlanText &plan_text,
@ -193,7 +193,7 @@ private:
ObSqlPlanItem* sql_plan_item);
int get_plan_outline_info(PlanText &plan_text,
ObLogPlan* plan,
ObLogicalOperator* plan_top,
ObSqlPlanItem* sql_plan_item);
static int reset_plan_tree_outline_flag(ObLogicalOperator* op);

View File

@ -531,10 +531,6 @@ int ObOptimizerTraceImpl::append(const OptSystemStat& stat)
int ObOptimizerTraceImpl::append(const ObLogPlan *log_plan)
{
int ret = OB_SUCCESS;
char *buf = NULL;
int64_t buf_len = 1024 * 1024;
ObExplainDisplayOpt option;
option.with_tree_line_ = true;
const ObLogPlan *target_plan = log_plan;
if (OB_NOT_NULL(target_plan) &&
target_plan->get_stmt()->is_explain_stmt()) {
@ -542,9 +538,11 @@ int ObOptimizerTraceImpl::append(const ObLogPlan *log_plan)
target_plan = op->get_explain_plan();
}
if (OB_NOT_NULL(target_plan)) {
ObExplainDisplayOpt option;
option.with_tree_line_ = true;
ObSqlPlan sql_plan(target_plan->get_allocator());
ObSEArray<common::ObString, 64> plan_strs;
if (OB_FAIL(sql_plan.print_sql_plan(const_cast<ObLogPlan*>(target_plan),
if (OB_FAIL(sql_plan.print_sql_plan(const_cast<ObLogicalOperator*>(target_plan->get_plan_root()),
EXPLAIN_EXTENDED,
option,
plan_strs))) {
@ -562,6 +560,34 @@ int ObOptimizerTraceImpl::append(const ObLogPlan *log_plan)
return ret;
}
int ObOptimizerTraceImpl::append(const ObLogicalOperator *plan_top)
{
int ret = OB_SUCCESS;
ObExplainDisplayOpt option;
option.with_tree_line_ = true;
if (OB_NOT_NULL(plan_top) && OB_NOT_NULL(plan_top->get_plan())) {
ObSqlPlan sql_plan(plan_top->get_plan()->get_allocator());
ObSEArray<common::ObString, 64> plan_strs;
if (OB_FAIL(sql_plan.print_sql_plan(const_cast<ObLogicalOperator*>(plan_top),
EXPLAIN_PLAN_TABLE,
option,
plan_strs))) {
LOG_WARN("failed to store sql plan", K(ret));
}
OPT_TRACE_TITLE("Query Plan");
new_line();
append_ptr(plan_top);
for (int64_t i = 0; OB_SUCC(ret) && i < plan_strs.count(); ++i) {
if (OB_FAIL(new_line())) {
LOG_WARN("failed to append msg", K(ret));
} else if (OB_FAIL(append(plan_strs.at(i)))) {
LOG_WARN("failed to append plan", K(ret));
}
}
}
return ret;
}
int ObOptimizerTraceImpl::append(const ObJoinOrder *join_order)
{
int ret = OB_SUCCESS;

View File

@ -36,6 +36,7 @@ class ObDMLStmt;
class ObSelectStmt;
class ObRawExpr;
class ObLogPlan;
class ObLogicalOperator;
class ObJoinOrder;
class Path;
class JoinPath;
@ -330,6 +331,7 @@ public:
////print plan info
/***********************************************/
int append(const ObLogPlan *log_plan);
int append(const ObLogicalOperator *plan_top);
int append(const ObJoinOrder *join_order);
int append(const Path *value);
int append(const JoinPath *value);

View File

@ -3491,7 +3491,7 @@ int ObSql::generate_plan(ParseResult &parse_result,
option.with_tree_line_ = false;
ObSqlPlan sql_plan(logical_plan->get_allocator());
ObSEArray<common::ObString, 32> plan_strs;
if (OB_TMP_FAIL(sql_plan.print_sql_plan(logical_plan,
if (OB_TMP_FAIL(sql_plan.print_sql_plan(logical_plan->get_plan_root(),
EXPLAIN_EXTENDED,
option,
plan_strs))) {

View File

@ -328,6 +328,7 @@ enum ExplainType
EXPLAIN_EXTENDED_NOADDR,
EXPLAIN_DBLINK_STMT,
EXPLAIN_HINT_FORMAT,
EXPLAIN_PLAN_TABLE
};
enum DiagnosticsType

View File

@ -10918,7 +10918,6 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
bool is_partition_wise = false;
bool is_ext_partition_wise = false;
bool right_is_base_table = false;
bool need_pull_to_local = false;
ObSEArray<ObRawExpr*, 8> target_part_keys;
ObShardingInfo *left_sharding = NULL;
ObShardingInfo *right_sharding = NULL;
@ -11150,11 +11149,9 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
} else if (is_partition_wise) {
bool need_reduce_dop = left_path.parallel_more_than_part_cnt()
|| right_path.parallel_more_than_part_cnt();
if (!need_reduce_dop && left_path.exchange_allocated_ == right_path.exchange_allocated_) {
if (!need_reduce_dop) {
distributed_methods = DIST_PARTITION_WISE;
OPT_TRACE("plan will use partition wise method");
} else {
need_pull_to_local = true;
}
} else {
distributed_methods &= ~DIST_PARTITION_WISE;
@ -11249,7 +11246,6 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
if (use_shared_hash_join && HASH_JOIN == join_algo) {
distributed_methods &= ~DIST_BC2HOST_NONE;
}
need_pull_to_local = right_path.exchange_allocated_;
}
}
// check if match hash none
@ -11290,7 +11286,6 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
OPT_TRACE("plan will use none partition method and prune none broadcast/hash method");
distributed_methods &= ~DIST_NONE_BROADCAST;
distributed_methods &= ~DIST_NONE_HASH;
need_pull_to_local = false;
}
}
// check if match none-hash
@ -11326,8 +11321,7 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
* if we have other parallel join methods, avoid pull to local execution,
* we may change this strategy in future
*/
if (OB_SUCC(ret) && distributed_methods != DIST_PULL_TO_LOCAL &&
!need_pull_to_local) {
if (OB_SUCC(ret) && distributed_methods != DIST_PULL_TO_LOCAL) {
distributed_methods &= ~DIST_PULL_TO_LOCAL;
OPT_TRACE("plan will not use pull to local method");
}

View File

@ -132,6 +132,10 @@ int ObLogDistinct::est_cost()
double distinct_cost = 0.0;
ObLogicalOperator *child = NULL;
double child_ndv = total_ndv_;
EstimateCostInfo param;
param.need_parallel_ = get_parallel();
double child_card = 0;
double child_cost = 0;
if (OB_ISNULL(child = get_child(ObLogicalOperator::first_child))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(child), K(ret));
@ -140,9 +144,14 @@ int ObLogDistinct::est_cost()
LOG_WARN("get unexpected total ndv", K(child_ndv), K(ret));
} else if (OB_FAIL(inner_est_cost(get_parallel(), child->get_card(), child_ndv, distinct_cost))) {
LOG_WARN("failed to est distinct cost", K(ret));
} else if (need_re_est_child_cost() &&
OB_FAIL(SMART_CALL(child->re_est_cost(param, child_card, child_cost)))) {
LOG_WARN("failed to re est child cost", K(ret));
} else if (!need_re_est_child_cost() &&
OB_FALSE_IT(child_cost=child->get_cost())) {
} else {
set_op_cost(distinct_cost);
set_cost(child->get_cost() + distinct_cost);
set_cost(child_cost + distinct_cost);
set_card(child_ndv);
}
return ret;
@ -452,5 +461,26 @@ int ObLogDistinct::compute_property()
return ret;
}
int ObLogDistinct::compute_op_parallel_and_server_info()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObLogicalOperator::compute_op_parallel_and_server_info())) {
LOG_WARN("failed to compute parallel and server info", K(ret));
} else if (is_partition_wise() && !is_push_down()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
}
return ret;
}
}
}

View File

@ -78,6 +78,7 @@ public:
virtual int get_card_without_filter(double &card) override;
virtual int check_use_child_ordering(bool &used, int64_t &inherit_child_ordering_index)override;
virtual int compute_property() override;
virtual int compute_op_parallel_and_server_info() override;
private:
common::ObSEArray<ObRawExpr*, 16, common::ModulePageAllocator, true> distinct_exprs_;

View File

@ -247,6 +247,9 @@ int ObLogGroupBy::est_cost()
double selectivity = 1.0;
double group_cost = 0.0;
ObLogicalOperator *child = get_child(ObLogicalOperator::first_child);
EstimateCostInfo param;
param.need_parallel_ = get_parallel();
double child_cost = 0;
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret), K(child));
@ -257,9 +260,14 @@ int ObLogGroupBy::est_cost()
child_ndv,
group_cost))) {
LOG_WARN("failed to est group by cost", K(ret));
} else if (need_re_est_child_cost() &&
OB_FAIL(SMART_CALL(child->re_est_cost(param, child_card, child_cost)))) {
LOG_WARN("failed to re est child cost", K(ret));
} else if (!need_re_est_child_cost() &&
OB_FALSE_IT(child_cost=child->get_cost())) {
} else {
set_card(child_ndv * selectivity);
set_cost(child->get_cost() + group_cost);
set_cost(child_cost + group_cost);
set_op_cost(group_cost);
}
return ret;
@ -966,4 +974,25 @@ int ObLogGroupBy::check_use_child_ordering(bool &used, int64_t &inherit_child_or
used = false;
}
return ret;
}
int ObLogGroupBy::compute_op_parallel_and_server_info()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObLogicalOperator::compute_op_parallel_and_server_info())) {
LOG_WARN("failed to compute parallel and server info", K(ret));
} else if (is_partition_wise() && !is_push_down()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
}
return ret;
}

View File

@ -226,13 +226,15 @@ public:
{ return ObRollupStatus::ROLLUP_COLLECTOR == rollup_adaptive_info_.rollup_status_; }
inline void set_force_push_down(bool force_push_down)
{ force_push_down_ = force_push_down; }
void set_group_by_outline_info(bool is_basic,
bool is_partition_wise,
void set_group_by_outline_info(DistAlgo algo,
bool use_hash_aggr,
bool has_push_down,
bool use_part_sort = false)
{
dist_method_ = is_basic ? T_DISTRIBUTE_BASIC : (is_partition_wise ? T_DISTRIBUTE_NONE : T_DISTRIBUTE_HASH);
dist_method_ = DistAlgo::DIST_BASIC_METHOD == algo ? T_DISTRIBUTE_BASIC :
(DistAlgo::DIST_PARTITION_WISE == algo ? T_DISTRIBUTE_NONE :
(DistAlgo::DIST_HASH_HASH == algo ? T_DISTRIBUTE_HASH : T_DISTRIBUTE_LOCAL));
use_hash_aggr_ = use_hash_aggr;
has_push_down_ = has_push_down;
use_part_sort_ = use_part_sort;
@ -267,6 +269,7 @@ private:
virtual int print_outline_data(PlanText &plan_text) override;
virtual int print_used_hint(PlanText &plan_text) override;
virtual int check_use_child_ordering(bool &used, int64_t &inherit_child_ordering_index)override;
virtual int compute_op_parallel_and_server_info() override;
private:
common::ObSEArray<ObRawExpr *, 8, common::ModulePageAllocator, true> group_exprs_;
common::ObSEArray<ObRawExpr *, 8, common::ModulePageAllocator, true> rollup_exprs_;

View File

@ -4669,6 +4669,7 @@ int ObLogPlan::candi_allocate_root_exchange()
OB_FAIL(allocate_exchange_as_top(best_candidates.at(i).plan_tree_, exch_info))) {
LOG_WARN("failed to allocate exchange as top", K(ret));
} else { /*do nothing*/ }
OPT_TRACE("generate root exchange for plan:", best_candidates.at(i).plan_tree_);
}
if (OB_SUCC(ret)) {
ObLogicalOperator *best_plan = NULL;
@ -4677,6 +4678,7 @@ int ObLogPlan::candi_allocate_root_exchange()
} else if (OB_FAIL(candidates_.get_best_plan(best_plan))) {
LOG_WARN("failed to get best plan", K(ret));
} else {
OPT_TRACE("choose best plan:", best_plan);
set_plan_root(best_plan);
best_plan->mark_is_plan_root();
get_optimizer_context().set_plan_type(best_plan->get_phy_plan_type(),
@ -5045,8 +5047,8 @@ int ObLogPlan::create_three_stage_group_plan(const ObIArray<ObRawExpr*> &group_b
OB_ISNULL(second_group_by = static_cast<ObLogGroupBy *>(top))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("second group by is invalid", K(ret), KP(top));
} else if (is_hash_rollup) { // do nothing
} else if (OB_FAIL(second_group_by->set_rollup_info(second_rollup_status,
} else if (!is_hash_rollup &&
OB_FAIL(second_group_by->set_rollup_info(second_rollup_status,
helper.rollup_id_expr_,
rd_second_sort_keys,
rd_second_ecd_sort_keys,
@ -5142,13 +5144,12 @@ int ObLogPlan::create_three_stage_group_plan(const ObIArray<ObRawExpr*> &group_b
OB_ISNULL(third_group_by = static_cast<ObLogGroupBy *>(top))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("second group by is invalid", K(ret), KP(top));
} else if (is_hash_rollup) {
} else if (OB_FAIL(third_group_by->set_rollup_info(third_rollup_status,
} else if (!is_hash_rollup &&
OB_FAIL(third_group_by->set_rollup_info(third_rollup_status,
helper.rollup_id_expr_))) {
LOG_WARN("failed to set rollup parallel info", K(ret));
}
if (OB_SUCC(ret)) {
third_group_by->set_group_by_outline_info(false, false, HASH_AGGREGATE == second_aggr_algo, true);
} else {
third_group_by->set_group_by_outline_info(DistAlgo::DIST_HASH_HASH, HASH_AGGREGATE == second_aggr_algo, true);
}
}
return ret;
@ -5355,32 +5356,143 @@ int ObLogPlan::inner_candi_allocate_scala_group_by(const ObIArray<ObAggFunRawExp
LOG_WARN("failed to get minimal cost candidate", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < best_plans.count(); i++) {
OPT_TRACE("generate scala group by for plan:", best_plans.at(i));
if (OB_FAIL(create_scala_group_plan(agg_items,
having_exprs,
groupby_helper,
best_plans.at(i).plan_tree_))) {
LOG_WARN("failed to create scala group by plan", K(ret));
} else if (NULL != best_plans.at(i).plan_tree_ &&
OB_FAIL(groupby_plans.push_back(best_plans.at(i)))) {
LOG_WARN("failed to push merge group by", K(ret));
} else { /*do nothing*/ }
OPT_TRACE("start to generate scala group by plan:");
uint64_t scala_group_dist_methods = 0;
if (OB_FAIL(get_distribute_group_by_method(best_plans.at(i).plan_tree_,
groupby_helper,
groupby_helper.distinct_exprs_,
scala_group_dist_methods))) {
LOG_WARN("failed to get distribute method", K(ret));
}
for (int64_t j = DistAlgo::DIST_BASIC_METHOD;
OB_SUCC(ret) && j < DistAlgo::DIST_MAX_JOIN_METHOD; j = (j << 1)) {
if (scala_group_dist_methods & j) {
DistAlgo scala_group_dist_algo = get_dist_algo(j);
CandidatePlan candi_plan = best_plans.at(i);
if (OB_FAIL(create_scala_group_plan(agg_items,
having_exprs,
groupby_helper,
candi_plan.plan_tree_,
scala_group_dist_algo))) {
LOG_WARN("failed to create scala group by plan", K(ret));
} else if (NULL != candi_plan.plan_tree_ &&
OB_FAIL(groupby_plans.push_back(candi_plan))) {
LOG_WARN("failed to push merge group by", K(ret));
} else {
OPT_TRACE("succeed to generate scala group by plan:", candi_plan.plan_tree_);
}
}
}
}
}
return ret;
}
int ObLogPlan::get_distribute_group_by_method(ObLogicalOperator *top,
GroupingOpHelper &groupby_helper,
const ObIArray<ObRawExpr*> &reduce_exprs,
uint64_t &group_dist_methods)
{
int ret = OB_SUCCESS;
bool is_partition_wise = false;
bool need_pull_to_local = false;
group_dist_methods = 0;
if (OB_ISNULL(top) || OB_ISNULL(get_optimizer_context().get_query_ctx())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(top), K(ret));
} else {
if (groupby_helper.allow_basic()) {
group_dist_methods |= DistAlgo::DIST_BASIC_METHOD;
} else {
OPT_TRACE("ignore basic group operator by hint");
}
if (groupby_helper.allow_partition_wise(get_optimizer_context().is_partition_wise_plan_enabled())) {
group_dist_methods |= DistAlgo::DIST_PARTITION_WISE;
} else {
OPT_TRACE("ignore partition wise group operator by hint");
}
if (groupby_helper.allow_dist_hash()) {
group_dist_methods |= DistAlgo::DIST_HASH_HASH;
} else {
OPT_TRACE("ignore hash group operator by hint");
}
if (groupby_helper.allow_pull_to_local()) {
group_dist_methods |= DistAlgo::DIST_PULL_TO_LOCAL;
} else {
OPT_TRACE("ignore pull to local group operator by hint");
}
if (groupby_helper.is_scalar_group_by_) {
group_dist_methods &= ~DistAlgo::DIST_PARTITION_WISE;
if (groupby_helper.distinct_aggr_items_.empty()) {
group_dist_methods &= ~DistAlgo::DIST_HASH_HASH;
} else if (!groupby_helper.distinct_aggr_items_.empty()) {
need_pull_to_local = true;
}
if (!groupby_helper.can_three_stage_pushdown_) {
group_dist_methods &= ~DistAlgo::DIST_HASH_HASH;
}
} else {
group_dist_methods &= ~DistAlgo::DIST_PULL_TO_LOCAL;
}
if (!top->is_distributed()) {
group_dist_methods &= ~DistAlgo::DIST_PARTITION_WISE;
group_dist_methods &= ~DistAlgo::DIST_HASH_HASH;
group_dist_methods &= ~DistAlgo::DIST_PULL_TO_LOCAL;
}
}
if (OB_SUCC(ret) && (group_dist_methods & DistAlgo::DIST_BASIC_METHOD)) {
if (top->is_distributed()) {
group_dist_methods &= ~DistAlgo::DIST_BASIC_METHOD;
OPT_TRACE("group operator will not use basic method");
} else {
group_dist_methods = DistAlgo::DIST_BASIC_METHOD;
OPT_TRACE("group operator will use basic method and prune other method");
}
}
if (OB_SUCC(ret) && (group_dist_methods & DistAlgo::DIST_PARTITION_WISE)) {
if (!reduce_exprs.empty() &&
OB_FAIL(top->check_sharding_compatible_with_reduce_expr(reduce_exprs,
is_partition_wise))) {
LOG_WARN("failed to check if sharding compatible with distinct expr", K(ret));
} else if (is_partition_wise && top->is_parallel_more_than_part_cnt() &&
get_optimizer_context().get_query_ctx()->check_opt_compat_version(COMPAT_VERSION_4_3_5)) {
OPT_TRACE("group operator will use partition wise method");
} else if (is_partition_wise) {
group_dist_methods = DistAlgo::DIST_PARTITION_WISE;
OPT_TRACE("group operator will use partition wise method and prune other method");
} else {
group_dist_methods &= ~DistAlgo::DIST_PARTITION_WISE;
OPT_TRACE("group operator will not use partition wise method");
}
}
if (OB_SUCC(ret) && (group_dist_methods & DistAlgo::DIST_HASH_HASH)) {
OPT_TRACE("group operator will use hash method");
}
if (OB_SUCC(ret) &&
!need_pull_to_local &&
DistAlgo::DIST_PULL_TO_LOCAL != group_dist_methods) {
group_dist_methods &= ~DistAlgo::DIST_PULL_TO_LOCAL;
OPT_TRACE("group operator will not use pull to local method");
}
return ret;
}
int ObLogPlan::create_scala_group_plan(const ObIArray<ObAggFunRawExpr*> &aggr_items,
const ObIArray<ObRawExpr*> &having_exprs,
GroupingOpHelper &groupby_helper,
ObLogicalOperator *&top)
ObLogicalOperator *&top,
const DistAlgo algo)
{
int ret = OB_SUCCESS;
bool is_partition_wise = false;
ObSEArray<ObRawExpr*, 1> dummy_exprs;
ObSEArray<ObAggFunRawExpr*, 1> dummy_aggr;
double origin_child_card = 0.0;
ObExchangeInfo exch_info;
OPT_TRACE("generate scala group plan with method:", ob_dist_algo_str(algo));
if (OB_ISNULL(top)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get unexpected null", K(ret));
@ -5390,29 +5502,21 @@ int ObLogPlan::create_scala_group_plan(const ObIArray<ObAggFunRawExpr*> &aggr_it
groupby_helper.pushdown_groupby_columns_.empty() ? aggr_items : dummy_aggr,
groupby_helper.pushdown_groupby_columns_))) {
LOG_WARN("failed to push group by into table scan", K(ret));
} else if (!top->is_distributed()) {
OPT_TRACE("generate scala group plan without pushdown");
if (!groupby_helper.allow_basic()) {
top = NULL;
OPT_TRACE("ignore basic scala group by hint");
} else if (OB_FAIL(allocate_scala_group_by_as_top(top,
aggr_items,
having_exprs,
groupby_helper.is_from_povit_,
origin_child_card))) {
} else if (DistAlgo::DIST_BASIC_METHOD == algo) {
if (OB_FAIL(allocate_scala_group_by_as_top(top,
aggr_items,
having_exprs,
groupby_helper.is_from_povit_,
origin_child_card))) {
LOG_WARN("failed to allocate scala group by as top", K(ret));
} else {
static_cast<ObLogGroupBy*>(top)->set_group_by_outline_info(true, false, false, false);
static_cast<ObLogGroupBy*>(top)->set_group_by_outline_info(algo, false, false);
}
} else if (!groupby_helper.distinct_exprs_.empty() &&
OB_FAIL(top->check_sharding_compatible_with_reduce_expr(groupby_helper.distinct_exprs_,
is_partition_wise))) {
LOG_WARN("failed to check if sharding compatible with distinct expr", K(ret));
} else if (groupby_helper.can_three_stage_pushdown_ &&
(!is_partition_wise || !get_optimizer_context().is_partition_wise_plan_enabled()) ) {
OPT_TRACE("generate three stage group plan");
if (NULL == groupby_helper.aggr_code_expr_ &&
OB_FAIL(prepare_three_stage_info(dummy_exprs, dummy_exprs, groupby_helper))) {
} else if (DistAlgo::DIST_HASH_HASH == algo) {
if (!groupby_helper.can_three_stage_pushdown_) {
OPT_TRACE("can not do three stage pushdown, ignore hash dist plan");
} else if (NULL == groupby_helper.aggr_code_expr_ &&
OB_FAIL(prepare_three_stage_info(dummy_exprs, dummy_exprs, groupby_helper))) {
LOG_WARN("failed to prepare three stage info", K(ret));
} else if (OB_FAIL(create_three_stage_group_plan(dummy_exprs,
dummy_exprs,
@ -5421,22 +5525,27 @@ int ObLogPlan::create_scala_group_plan(const ObIArray<ObAggFunRawExpr*> &aggr_it
top))) {
LOG_WARN("failed to create three stage group plan", K(ret));
}
} else {
if ((groupby_helper.can_basic_pushdown_ || is_partition_wise) &&
OB_FAIL(allocate_group_by_as_top(top,
AggregateAlgo::MERGE_AGGREGATE,
dummy_exprs,
dummy_exprs,
aggr_items,
dummy_exprs,
groupby_helper.is_from_povit_,
groupby_helper.group_ndv_,
origin_child_card,
is_partition_wise,
true,
is_partition_wise,
ObRollupStatus::NONE_ROLLUP,
true))) {
} else if (DistAlgo::DIST_PULL_TO_LOCAL == algo) {
bool can_pushdown_distinct_aggr = false;
if (!groupby_helper.distinct_exprs_.empty() &&
OB_FAIL(top->check_sharding_compatible_with_reduce_expr(groupby_helper.distinct_exprs_,
can_pushdown_distinct_aggr))) {
LOG_WARN("failed to check if sharding compatible with distinct expr", K(ret));
} else if ((groupby_helper.can_basic_pushdown_ || can_pushdown_distinct_aggr) &&
OB_FAIL(allocate_group_by_as_top(top,
AggregateAlgo::MERGE_AGGREGATE,
dummy_exprs,
dummy_exprs,
aggr_items,
dummy_exprs,
groupby_helper.is_from_povit_,
groupby_helper.group_ndv_,
origin_child_card,
can_pushdown_distinct_aggr,
true,
can_pushdown_distinct_aggr,
ObRollupStatus::NONE_ROLLUP,
true))) {
LOG_WARN("failed to allocate scala group by as top", K(ret));
} else if (OB_FAIL(allocate_exchange_as_top(top, exch_info))) {
LOG_WARN("failed to allocate exchange as top", K(ret));
@ -5447,10 +5556,13 @@ int ObLogPlan::create_scala_group_plan(const ObIArray<ObAggFunRawExpr*> &aggr_it
origin_child_card))) {
LOG_WARN("failed to allocate scala group by as top", K(ret));
} else {
static_cast<ObLogGroupBy*>(top)->set_group_by_outline_info(false, is_partition_wise, false, groupby_helper.can_basic_pushdown_ || is_partition_wise);
bool has_push_down_group = groupby_helper.can_basic_pushdown_ || can_pushdown_distinct_aggr;
static_cast<ObLogGroupBy*>(top)->set_group_by_outline_info(algo, false, has_push_down_group);
}
} else {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected dist method", K(algo), K(ret));
}
return ret;
}
@ -5614,11 +5726,14 @@ int ObLogPlan::init_groupby_helper(const ObIArray<ObRawExpr*> &group_exprs,
} else if (OB_FAIL(append(group_rollup_exprs, group_exprs))
|| OB_FAIL(append(group_rollup_exprs, rollup_exprs))) {
LOG_WARN("failed to append group rollup exprs", K(ret));
} else if (OB_FAIL(get_log_plan_hint().get_aggregation_info(
groupby_helper.force_use_hash_, groupby_helper.force_use_merge_,
groupby_helper.force_part_sort_, groupby_helper.force_normal_sort_,
groupby_helper.force_basic_, groupby_helper.force_partition_wise_,
groupby_helper.force_dist_hash_))) {
} else if (OB_FAIL(get_log_plan_hint().get_aggregation_info(groupby_helper.force_use_hash_,
groupby_helper.force_use_merge_,
groupby_helper.force_part_sort_,
groupby_helper.force_normal_sort_,
groupby_helper.force_basic_,
groupby_helper.force_partition_wise_,
groupby_helper.force_dist_hash_,
groupby_helper.force_pull_to_local_))) {
LOG_WARN("failed to get aggregation info from hint", K(ret));
} else if (OB_FAIL(check_storage_groupby_pushdown(aggr_items, group_exprs,
groupby_helper.pushdown_groupby_columns_,
@ -7936,7 +8051,6 @@ int ObLogPlan::candi_allocate_subplan_filter(const ObIArray<ObRawExpr*> &subquer
ObSEArray<ObExecParamRawExpr *, 4> params;
ObSEArray<ObExecParamRawExpr *, 4> onetime_exprs;
ObSEArray<ObRawExpr *, 4> new_filters;
OPT_TRACE_TITLE("start generate subplan filter");
ObSEArray<ObQueryRefRawExpr*, 4> subqueries;
ObSEArray<ObRawExpr*, 4> nested_subquery_exprs;
if (OB_FAIL(ObTransformUtils::extract_query_ref_expr(subquery_exprs, subqueries, false))) {
@ -8449,6 +8563,9 @@ int ObLogPlan::generate_subplan_filter_info(const ObIArray<ObRawExpr *> &subquer
LOG_WARN("failed to append query refs", K(ret));
}
}
if (!candi_query_refs.empty()) {
OPT_TRACE_TITLE("start generate subplan filter");
}
for (int64_t i = 0; OB_SUCC(ret) && i < candi_query_refs.count(); ++i) {
SubPlanInfo *info = NULL;
if (OB_FAIL(get_subplan(candi_query_refs.at(i), info))) {
@ -8723,6 +8840,7 @@ int ObLogPlan::create_subplan_filter_plan(ObLogicalOperator *&top,
LOG_WARN("failed to allocate subplan filter as top", K(ret));
} else { /*do nothing*/
}
OPT_TRACE("succeed to generate subplan filter plan:", top);
return ret;
}

View File

@ -469,6 +469,10 @@ public:
force_use_merge_(false),
force_part_sort_(false),
force_normal_sort_(false),
force_basic_(false),
force_partition_wise_(false),
force_dist_hash_(false),
force_pull_to_local_(false),
is_scalar_group_by_(false),
distinct_exprs_(),
aggr_code_expr_(NULL),
@ -488,14 +492,21 @@ public:
void set_ignore_hint() { ignore_hint_ = true; }
void clear_ignore_hint() { ignore_hint_ = false; }
inline bool allow_basic() const { return ignore_hint_ || (!force_partition_wise_ && !force_dist_hash_); }
inline bool allow_dist_hash() const { return ignore_hint_ || (!force_basic_ && !force_partition_wise_); }
inline bool allow_basic() const { return ignore_hint_ || (!force_partition_wise_ &&
!force_dist_hash_ &&
!force_pull_to_local_); }
inline bool allow_dist_hash() const { return ignore_hint_ || (!force_basic_ &&
!force_partition_wise_ &&
!force_pull_to_local_); }
inline bool allow_partition_wise(bool enable_partition_wise_plan) const
{
bool disable_by_rule = !enable_partition_wise_plan && optimizer_features_enable_version_ > COMPAT_VERSION_4_3_2;
return ignore_hint_ ? !disable_by_rule
: (disable_by_rule ? force_partition_wise_ : (!force_basic_ && !force_dist_hash_));
: (disable_by_rule ? force_partition_wise_ : (!force_basic_ && !force_dist_hash_ && !force_pull_to_local_));
}
inline bool allow_pull_to_local() const { return ignore_hint_ || (!force_basic_ &&
!force_dist_hash_ &&
!force_partition_wise_); }
inline void reset_three_stage_info()
{
@ -515,6 +526,7 @@ public:
bool force_basic_; // pq hint force use basic plan
bool force_partition_wise_; // pq hint force use partition wise plan
bool force_dist_hash_; // pq hint force use hash distributed method plan
bool force_pull_to_local_;
bool is_scalar_group_by_;
bool is_from_povit_;
bool ignore_hint_;
@ -553,6 +565,7 @@ public:
K_(force_basic),
K_(force_partition_wise),
K_(force_dist_hash),
K_(force_pull_to_local),
K_(is_scalar_group_by),
K_(is_from_povit),
K_(ignore_hint),
@ -734,6 +747,10 @@ public:
GroupingOpHelper &groupby_helper,
ObIArray<CandidatePlan> &groupby_plans);
int get_distribute_group_by_method(ObLogicalOperator *top,
GroupingOpHelper &groupby_helper,
const ObIArray<ObRawExpr*> &reduce_exprs,
uint64_t &group_dist_methods);
int prepare_three_stage_info(const ObIArray<ObRawExpr *> &group_by_exprs,
const ObIArray<ObRawExpr *> &rollup_exprs,
GroupingOpHelper &helper);
@ -772,7 +789,8 @@ public:
int create_scala_group_plan(const ObIArray<ObAggFunRawExpr*> &agg_items,
const ObIArray<ObRawExpr*> &having_exprs,
GroupingOpHelper &groupby_helper,
ObLogicalOperator *&top);
ObLogicalOperator *&top,
const DistAlgo algo);
int check_can_pullup_gi(ObLogicalOperator &top,
bool is_partition_wise,

View File

@ -925,6 +925,54 @@ int ObLogSet::compute_op_parallel_and_server_info()
int ret = OB_SUCCESS;
if (OB_FAIL(compute_normal_multi_child_parallel_and_server_info())) {
LOG_WARN("failed to compute multi child parallel and server info", K(ret), K(get_distributed_algo()));
} else if (DistAlgo::DIST_PARTITION_WISE == get_distributed_algo()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
} else if (DistAlgo::DIST_SET_PARTITION_WISE == get_distributed_algo()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_server_list().count() > 0 &&
get_parallel() > child->get_server_list().count()) {
int64_t reduce_parallel = child->get_server_list().count();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
} else if (DistAlgo::DIST_PARTITION_NONE == get_distributed_algo()) {
ObLogicalOperator *child = get_child(second_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
} else if (DistAlgo::DIST_NONE_PARTITION == get_distributed_algo()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
}
return ret;
}

View File

@ -829,6 +829,19 @@ int ObLogSubPlanFilter::compute_op_parallel_and_server_info()
if (OB_FAIL(get_server_list().push_back(all_server_list))) {
LOG_WARN("failed to assign all server list", K(ret));
}
} else if (DistAlgo::DIST_PARTITION_WISE == get_distributed_algo() ||
DistAlgo::DIST_PARTITION_NONE == get_distributed_algo()) {
ObLogicalOperator *child = get_child(second_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
}
return ret;
}

View File

@ -344,6 +344,9 @@ int ObLogWindowFunction::est_cost()
double child_card = 0.0;
double child_width = 0.0;
double sel = 0.0;
EstimateCostInfo param;
param.need_parallel_ = get_parallel();
double child_cost = 0;
if (OB_ISNULL(get_plan()) ||
OB_ISNULL(first_child = get_child(ObLogicalOperator::first_child))) {
ret = OB_ERR_UNEXPECTED;
@ -352,8 +355,13 @@ int ObLogWindowFunction::est_cost()
LOG_WARN("get child est info failed", K(ret));
} else if (OB_FAIL(inner_est_cost(child_card, child_width, op_cost_))) {
LOG_WARN("calculate cost of window function failed", K(ret));
} else if (need_re_est_child_cost() &&
OB_FAIL(SMART_CALL(first_child->re_est_cost(param, child_card, child_cost)))) {
LOG_WARN("failed to re est child cost", K(ret));
} else if (!need_re_est_child_cost() &&
OB_FALSE_IT(child_cost=first_child->get_cost())) {
} else {
set_cost(first_child->get_cost() + op_cost_);
set_cost(child_cost + op_cost_);
set_op_cost(op_cost_);
set_card(child_card * sel);
}
@ -688,4 +696,25 @@ int ObLogWindowFunction::check_use_child_ordering(bool &used, int64_t &inherit_c
used = false;
}
return ret;
}
int ObLogWindowFunction::compute_op_parallel_and_server_info()
{
int ret = OB_SUCCESS;
if (OB_FAIL(ObLogicalOperator::compute_op_parallel_and_server_info())) {
LOG_WARN("failed to compute parallel and server info", K(ret));
} else if (is_partition_wise()) {
ObLogicalOperator *child = get_child(first_child);
if (OB_ISNULL(child)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpect null child op", K(ret));
} else if (child->get_part_cnt() > 0 &&
get_parallel() > child->get_part_cnt()) {
int64_t reduce_parallel = child->get_part_cnt();
reduce_parallel = reduce_parallel < 2 ? 2 : reduce_parallel;
set_parallel(reduce_parallel);
need_re_est_child_cost_ = true;
}
}
return ret;
}

View File

@ -121,6 +121,7 @@ namespace sql
int est_window_function_part_cnt();
virtual int compute_property() override;
virtual int check_use_child_ordering(bool &used, int64_t &inherit_child_ordering_index)override;
virtual int compute_op_parallel_and_server_info() override;
private:
ObSEArray<ObWinFunRawExpr *, 4, common::ModulePageAllocator, true> win_exprs_;

View File

@ -442,7 +442,8 @@ ObLogicalOperator::ObLogicalOperator(ObLogPlan &plan)
inherit_sharding_index_(-1),
need_osg_merge_(false),
max_px_thread_branch_(OB_INVALID_INDEX),
max_px_group_branch_(OB_INVALID_INDEX)
max_px_group_branch_(OB_INVALID_INDEX),
need_re_est_child_cost_(false)
{
}
@ -6742,3 +6743,23 @@ int ObLogicalOperator::check_contain_dist_das(const ObIArray<ObAddr> &exec_serve
}
return ret;
}
bool ObLogicalOperator::is_parallel_more_than_part_cnt() const
{
if (NULL == strong_sharding_) {
return false;
} else if (strong_sharding_->get_part_cnt() < 1) {
return false;
} else {
return get_parallel() > strong_sharding_->get_part_cnt();
}
}
int64_t ObLogicalOperator::get_part_cnt() const
{
if (NULL == strong_sharding_) {
return 0;
} else {
return strong_sharding_->get_part_cnt();
}
}

View File

@ -1543,7 +1543,7 @@ public:
{
inherit_sharding_index_ = inherit_sharding_index;
}
inline bool need_re_est_child_cost() const { return need_re_est_child_cost_; }
inline bool need_osg_merge() const { return need_osg_merge_; }
inline void set_need_osg_merge(bool v)
{
@ -1633,6 +1633,8 @@ public:
int get_part_column_exprs(const uint64_t table_id,
const uint64_t ref_table_id,
ObIArray<ObRawExpr *> &part_cols) const;
bool is_parallel_more_than_part_cnt() const;
int64_t get_part_cnt() const;
inline void set_parallel(int64_t parallel) { parallel_ = parallel; }
inline int64_t get_parallel() const { return parallel_; }
inline void set_op_parallel_rule(OpParallelRule op_parallel_rule) { op_parallel_rule_ = op_parallel_rule; }
@ -1977,6 +1979,7 @@ protected:
bool need_osg_merge_;
int64_t max_px_thread_branch_;
int64_t max_px_group_branch_;
bool need_re_est_child_cost_;
};
template <typename Allocator>

File diff suppressed because it is too large Load Diff

View File

@ -120,13 +120,26 @@ private:
GroupingOpHelper &groupby_helper,
ObIArray<CandidatePlan> &groupby_plans);
int allocate_three_stage_group_by(const ObIArray<ObRawExpr*> &reduce_exprs,
const ObIArray<ObRawExpr*> &group_by_exprs,
const ObIArray<ObOrderDirection> &group_directions,
const ObIArray<ObRawExpr*> &rollup_exprs,
const ObIArray<ObOrderDirection> &rollup_directions,
const ObIArray<ObAggFunRawExpr*> &aggr_items,
const ObIArray<ObRawExpr*> &having_exprs,
GroupingOpHelper &groupby_helper,
const DistAlgo algo,
CandidatePlan &groupby_plan,
ObIArray<CandidatePlan> &groupby_plans);
int create_hash_group_plan(const ObIArray<ObRawExpr*> &reduce_exprs,
const ObIArray<ObRawExpr*> &group_by_exprs,
const ObIArray<ObRawExpr*> &rollup_exprs,
const ObIArray<ObAggFunRawExpr*> &aggr_items,
const ObIArray<ObRawExpr*> &having_exprs,
GroupingOpHelper &groupby_helper,
ObLogicalOperator *&top);
ObLogicalOperator *&top,
const DistAlgo algo);
int allocate_topk_for_hash_group_plan(ObLogicalOperator *&top);
@ -150,6 +163,7 @@ private:
const ObIArray<ObRawExpr *> &reduce_exprs,
const ObIArray<ObRawExpr *> &rollup_exprs,
GroupingOpHelper &groupby_helper,
const DistAlgo algo,
bool &is_needed);
int create_rollup_pushdown_plan(const ObIArray<ObRawExpr*> &group_by_exprs,
@ -157,6 +171,7 @@ private:
const ObIArray<ObAggFunRawExpr*> &aggr_items,
const ObIArray<ObRawExpr*> &having_exprs,
GroupingOpHelper &groupby_helper,
const DistAlgo algo,
ObLogicalOperator *&top);
int create_merge_group_plan(const ObIArray<ObRawExpr*> &reduce_exprs,
@ -171,6 +186,7 @@ private:
ObIArray<CandidatePlan> &candidate_plans,
bool part_sort_valid,
bool normal_sort_valid,
const DistAlgo algo,
bool can_ignore_merge = false);
int generate_merge_group_sort_keys(ObLogicalOperator *top,
@ -197,15 +213,23 @@ private:
const ObIArray<ObRawExpr*> &distinct_exprs,
ObIArray<CandidatePlan> &distinct_plans);
int get_distribute_distinct_method(ObLogicalOperator *top,
const GroupingOpHelper &distinct_helper,
const ObIArray<ObRawExpr*> &reduce_exprs,
uint64_t &distinct_dist_methods);
int create_hash_distinct_plan(ObLogicalOperator *&top,
const GroupingOpHelper &distinct_helper,
const ObIArray<ObRawExpr*> &reduce_exprs,
const ObIArray<ObRawExpr*> &distinct_exprs);
const ObIArray<ObRawExpr*> &distinct_exprs,
const DistAlgo algo);
int create_merge_distinct_plan(ObLogicalOperator *&top,
const GroupingOpHelper &distinct_helper,
const ObIArray<ObRawExpr*> &reduce_exprs,
const ObIArray<ObRawExpr*> &distinct_exprs,
const DistAlgo algo,
bool &ignore_plan,
bool can_ignore_merge_plan = false);
int allocate_distinct_as_top(ObLogicalOperator *&top,
@ -237,8 +261,19 @@ private:
ObIArray<ObLogicalOperator*> &best_child_ops,
ObIArray<ObLogicalOperator*> &best_das_child_ops,
ObIArray<ObLogicalOperator*> &best_px_child_ops);
int create_union_all_plan(const ObIArray<ObLogicalOperator*> &child_plans,
const bool ignore_hint,
int inner_generate_union_all_plan(const ObIArray<ObLogicalOperator*> &child_ops,
const bool ignore_hint,
ObIArray<CandidatePlan> &all_plans);
int get_distibute_union_all_method(const ObIArray<ObLogicalOperator*> &child_ops,
const bool ignore_hint,
uint64_t &set_dist_methods,
ObLogicalOperator* &largest_op);
int create_union_all_plan(const ObIArray<ObLogicalOperator*> &child_ops,
const DistAlgo dist_set_method,
ObLogicalOperator* largest_op,
ObLogicalOperator *&top);
int check_if_union_all_match_partition_wise(const ObIArray<ObLogicalOperator*> &child_ops,
@ -269,10 +304,14 @@ private:
const ObIArray<OrderItem> &order_items,
const bool ignore_hint,
ObIArray<CandidatePlan> &all_plans);
int get_recursive_union_all_distribute_method(ObLogicalOperator *left_child,
ObLogicalOperator *right_child,
const bool ignore_hint,
DistAlgo &dist_set_method);
int create_recursive_union_all_plan(ObLogicalOperator *left_child,
ObLogicalOperator *right_child,
const ObIArray<OrderItem> &candi_order_items,
const bool ignore_hint,
DistAlgo dist_set_method,
ObLogicalOperator *&top);
int allocate_recursive_union_all_as_top(ObLogicalOperator *left_child,
@ -616,9 +655,16 @@ private:
int create_one_window_function(CandidatePlan &candidate_plan,
const WinFuncOpHelper &win_func_helper,
ObIArray<CandidatePlan> &all_plans);
int get_distribute_window_method(ObLogicalOperator *top,
const WinFuncOpHelper &win_func_helper,
uint64_t &win_dist_methods,
bool &single_part_parallel,
bool &is_partition_wise);
int create_none_dist_win_func(ObLogicalOperator *top,
const WinFuncOpHelper &win_func_helper,
const int64_t need_sort,
const bool need_sort,
const bool single_part_parallel,
const bool is_partition_wise,
const int64_t prefix_pos,
const int64_t part_cnt,
ObIArray<CandidatePlan> &all_plans);
@ -638,7 +684,7 @@ private:
int create_hash_dist_win_func(ObLogicalOperator *top,
const WinFuncOpHelper &win_func_helper,
const int64_t need_sort,
const bool need_sort,
const int64_t prefix_pos,
const int64_t part_cnt,
ObIArray<CandidatePlan> &all_plans);
@ -956,6 +1002,8 @@ int generate_window_functions_plan(WinFuncOpHelper &win_func_helper,
GroupingOpHelper &groupby_helper,
ObLogicalOperator *&top,
bool use_part_sort,
const DistAlgo algo,
bool &ignore_plan,
bool can_ignore_merge = false);
int check_external_table_scan(ObSelectStmt *stmt, bool &has_external_table);

View File

@ -2788,6 +2788,7 @@ const char *ObPQHint::get_dist_method_str(ObItemType dist_method)
case T_DISTRIBUTE_BASIC: return "BASIC";
case T_DISTRIBUTE_NONE: return "NONE";
case T_DISTRIBUTE_HASH: return "HASH";
case T_DISTRIBUTE_LOCAL: return "LOCAL";
default: return NULL;
}
return NULL;

View File

@ -1223,6 +1223,7 @@ class ObPQHint : public ObOptHint
inline bool is_force_basic() const { return T_DISTRIBUTE_BASIC == dist_method_; }
inline bool is_force_partition_wise() const { return T_DISTRIBUTE_NONE == dist_method_; }
inline bool is_force_dist_hash() const { return T_DISTRIBUTE_HASH == dist_method_; }
inline bool is_force_pull_to_local() const { return T_DISTRIBUTE_LOCAL == dist_method_; }
INHERIT_TO_STRING_KV("ObHint", ObHint, K_(dist_method));
private:

View File

@ -1761,7 +1761,8 @@ int ObLogPlanHint::get_aggregation_info(bool &force_use_hash,
bool &force_normal_sort,
bool &force_basic,
bool &force_partition_wise,
bool &force_dist_hash) const
bool &force_dist_hash,
bool &force_pull_to_local) const
{
int ret = OB_SUCCESS;
force_use_hash = false;
@ -1792,10 +1793,12 @@ int ObLogPlanHint::get_aggregation_info(bool &force_use_hash,
force_basic = pq_hint->is_force_basic();
force_partition_wise = pq_hint->is_force_partition_wise();
force_dist_hash = pq_hint->is_force_dist_hash();
force_pull_to_local = pq_hint->is_force_pull_to_local();
} else if (is_outline_data_) {
force_basic = true;
force_partition_wise = false;
force_dist_hash = false;
force_pull_to_local = false;
}
return ret;
}

View File

@ -482,7 +482,8 @@ struct ObLogPlanHint
bool &force_normal_sort,
bool &force_basic,
bool &force_partition_wise,
bool &force_dist_hash) const;
bool &force_dist_hash,
bool &force_pull_to_local) const;
int get_distinct_info(bool &force_use_hash,
bool &force_use_merge,
bool &force_basic,

View File

@ -148,84 +148,81 @@ Outputs & filters:
range_key([r.__pk_increment]), range(MIN ; MAX)always true
explain select * from cb_dep_acctbal_54 s, cb_dep_acct_54 a where s.acctnbr = a.acctnbr and ((a.curracctstatcd != 'CLS' and exists(select 1 from cb_dep_acctbal_54 r where r.acctnbr = s.acctnbr)) or (a.curracctstatcd = 'CLS' and exists(select 1 from cb_dep_rxtnbal_54 r where r.acctnbr = a.acctnbr)));
Query Plan
===========================================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
---------------------------------------------------------------------------
|0 |MERGE UNION DISTINCT | |2 |38 |
|1 |├─MERGE JOIN | |1 |18 |
|2 |│ ├─TABLE FULL SCAN |s |3 |3 |
|3 |│ └─SORT | |1 |16 |
|4 |│ └─HASH JOIN | |1 |16 |
|5 |│ ├─TABLE FULL SCAN |r |1 |3 |
|6 |│ └─PX COORDINATOR | |2 |13 |
|7 |│ └─EXCHANGE OUT DISTR |:EX10000|2 |12 |
|8 |│ └─PX PARTITION ITERATOR | |2 |11 |
|9 |│ └─TABLE FULL SCAN |a |2 |11 |
|10|└─SORT | |1 |20 |
|11| └─HASH JOIN | |1 |20 |
|12| ├─TABLE FULL SCAN |s |3 |3 |
|13| └─PX COORDINATOR | |1 |17 |
|14| └─EXCHANGE OUT DISTR |:EX20001|1 |16 |
|15| └─HASH JOIN | |1 |15 |
|16| ├─PX PARTITION ITERATOR | |1 |11 |
|17| │ └─TABLE FULL SCAN |a |1 |11 |
|18| └─EXCHANGE IN DISTR | |3 |4 |
|19| └─EXCHANGE OUT DISTR (PKEY)|:EX20000|3 |4 |
|20| └─TABLE FULL SCAN |r |3 |3 |
===========================================================================
===============================================================================
|ID|OPERATOR |NAME |EST.ROWS|EST.TIME(us)|
-------------------------------------------------------------------------------
|0 |PX COORDINATOR | |2 |43 |
|1 |└─EXCHANGE OUT DISTR |:EX10003|2 |42 |
|2 | └─HASH UNION DISTINCT | |2 |39 |
|3 | ├─HASH JOIN | |1 |18 |
|4 | │ ├─PX PARTITION ITERATOR | |1 |11 |
|5 | │ │ └─TABLE FULL SCAN |a |1 |11 |
|6 | │ └─EXCHANGE IN DISTR | |3 |7 |
|7 | │ └─EXCHANGE OUT DISTR (PKEY) |:EX10000|3 |6 |
|8 | │ └─TABLE FULL SCAN |s |3 |3 |
|9 | └─EXCHANGE IN DISTR | |1 |21 |
|10| └─EXCHANGE OUT DISTR (PKEY) |:EX10002|1 |20 |
|11| └─HASH DISTINCT | |1 |18 |
|12| └─MERGE JOIN | |1 |18 |
|13| ├─SORT | |1 |16 |
|14| │ └─HASH JOIN | |1 |16 |
|15| │ ├─TABLE FULL SCAN |r |1 |3 |
|16| │ └─EXCHANGE IN DISTR | |2 |13 |
|17| │ └─EXCHANGE OUT DISTR |:EX10001|2 |12 |
|18| │ └─PX PARTITION ITERATOR| |2 |11 |
|19| │ └─TABLE FULL SCAN |a |2 |11 |
|20| └─TABLE FULL SCAN |s |3 |3 |
===============================================================================
Outputs & filters:
-------------------------------------
0 - output([UNION([1])], [UNION([2])], [UNION([3])], [UNION([4])], [UNION([5])]), filter(nil), rowset=16
1 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
0 - output([INTERNAL_FUNCTION(UNION([1]), UNION([2]), UNION([3]), UNION([4]), UNION([5]))]), filter(nil), rowset=16
1 - output([INTERNAL_FUNCTION(UNION([1]), UNION([2]), UNION([3]), UNION([4]), UNION([5]))]), filter(nil), rowset=16
dop=1
2 - output([UNION([1])], [UNION([2])], [UNION([3])], [UNION([4])], [UNION([5])]), filter(nil), rowset=16
3 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
equal_conds([s.acctnbr = a.acctnbr]), other_conds(nil)
merge_directions([ASC])
2 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
4 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
affinitize, force partition granule
5 - output([a.acctnbr], [a.curracctstatcd]), filter([a.curracctstatcd != 'CLS']), rowset=16
access([a.acctnbr], [a.curracctstatcd]), partitions(p[0-4])
is_index_back=false, is_global_index=false, filter_before_indexback[false],
range_key([a.acctnbr]), range(MIN ; MAX)always true
6 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
7 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
(#keys=1, [s.acctnbr]), is_single, dop=1
8 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
access([s.acctnbr], [s.balcatcd], [s.baltypcd]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([s.acctnbr], [s.balcatcd], [s.baltypcd]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true
3 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
9 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
10 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
(#keys=1, [s.acctnbr]), is_single, dop=1
11 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
distinct([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd])
12 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
equal_conds([s.acctnbr = a.acctnbr]), other_conds(nil)
merge_directions([ASC])
13 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
sort_keys([a.acctnbr, ASC])
4 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
14 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
equal_conds([cast(r.acctnbr, DECIMAL(-1, -1)) = cast(a.acctnbr, DECIMAL(20, 0))]), other_conds(nil)
5 - output([r.acctnbr]), filter(nil), rowset=16
15 - output([r.acctnbr]), filter(nil), rowset=16
access([r.acctnbr]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([r.__pk_increment]), range(MIN ; MAX)always true
6 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
7 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
16 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
17 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
dop=1
8 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
18 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
force partition granule
9 - output([a.acctnbr], [a.curracctstatcd]), filter([a.curracctstatcd = 'CLS']), rowset=16
19 - output([a.acctnbr], [a.curracctstatcd]), filter([a.curracctstatcd = 'CLS']), rowset=16
access([a.acctnbr], [a.curracctstatcd]), partitions(p[0-4])
is_index_back=false, is_global_index=false, filter_before_indexback[false],
range_key([a.acctnbr]), range(MIN ; MAX)always true
10 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
sort_keys([s.acctnbr, ASC], [s.balcatcd, ASC], [s.baltypcd, ASC])
11 - output([s.acctnbr], [s.balcatcd], [s.baltypcd], [a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
equal_conds([s.acctnbr = a.acctnbr]), other_conds(nil)
12 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
20 - output([s.acctnbr], [s.balcatcd], [s.baltypcd]), filter(nil), rowset=16
access([s.acctnbr], [s.balcatcd], [s.baltypcd]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([s.acctnbr], [s.balcatcd], [s.baltypcd]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true
13 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
14 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
dop=1
15 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
equal_conds([a.acctnbr = r.acctnbr]), other_conds(nil)
16 - output([a.acctnbr], [a.curracctstatcd]), filter(nil), rowset=16
affinitize, force partition granule
17 - output([a.acctnbr], [a.curracctstatcd]), filter([a.curracctstatcd != 'CLS']), rowset=16
access([a.acctnbr], [a.curracctstatcd]), partitions(p[0-4])
is_index_back=false, is_global_index=false, filter_before_indexback[false],
range_key([a.acctnbr]), range(MIN ; MAX)always true
18 - output([r.acctnbr]), filter(nil), rowset=16
19 - output([r.acctnbr]), filter(nil), rowset=16
(#keys=1, [r.acctnbr]), is_single, dop=1
20 - output([r.acctnbr]), filter(nil), rowset=16
access([r.acctnbr]), partitions(p0)
is_index_back=false, is_global_index=false,
range_key([r.acctnbr], [r.balcatcd], [r.baltypcd]), range(MIN,MIN,MIN ; MAX,MAX,MAX)always true
explain select * from cb_dep_acctbal_54 s, cb_dep_acct_54 a where s.acctnbr = a.acctnbr and (a.curracctstatcd != 'CLS' or (a.curracctstatcd = 'CLS' and exists(select 1 from cb_dep_rxtnbal_54 r where r.acctnbr = a.acctnbr))) group by s.acctnbr;
Query Plan
=========================================================================

View File

@ -352,7 +352,7 @@ int TestOpEngine::get_tested_op_from_string(const std::string &sql, bool vector_
option.with_tree_line_ = true;
ObSqlPlan sql_plan(log_plan->get_allocator());
ObSEArray<common::ObString, 64> plan_strs;
if (OB_FAIL(sql_plan.print_sql_plan(log_plan, EXPLAIN_EXTENDED_NOADDR, option, plan_strs))) {
if (OB_FAIL(sql_plan.print_sql_plan(log_plan->get_plan_root(), EXPLAIN_EXTENDED_NOADDR, option, plan_strs))) {
LOG_WARN("failed to store sql plan", K(ret));
} else {
LOG_INFO("Generate Logical plan:");