diff --git a/deps/oblib/src/lib/utility/ob_tracepoint_def.h b/deps/oblib/src/lib/utility/ob_tracepoint_def.h index 7bc83069e..3d0e4f0de 100644 --- a/deps/oblib/src/lib/utility/ob_tracepoint_def.h +++ b/deps/oblib/src/lib/utility/ob_tracepoint_def.h @@ -376,6 +376,7 @@ GLOBAL_ERRSIM_POINT_DEF(556, EN_GENERATE_RANDOM_PLAN, "Whether the optimizer gen GLOBAL_ERRSIM_POINT_DEF(557, EN_COALESCE_AGGR_IGNORE_COST, ""); GLOBAL_ERRSIM_POINT_DEF(558, EN_CHECK_REWRITE_ITER_CONVERGE, "Reporting error when rewrite iter nonconvergent"); GLOBAL_ERRSIM_POINT_DEF(559, EN_PRINT_CONSTRAINTS_INFO, "show constraints info when explain query plan"); +GLOBAL_ERRSIM_POINT_DEF(560, EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION, "Use px random shuffle even statistical information do not exist."); // 600-700 For PX use GLOBAL_ERRSIM_POINT_DEF(600, EN_PX_SQC_EXECUTE_FAILED, ""); diff --git a/src/sql/engine/px/ob_dfo.h b/src/sql/engine/px/ob_dfo.h index 536127202..dec62a4e3 100644 --- a/src/sql/engine/px/ob_dfo.h +++ b/src/sql/engine/px/ob_dfo.h @@ -489,6 +489,7 @@ public: root_op_spec_(nullptr), child_dfos_(), has_scan_(false), + has_das_(false), has_dml_op_(false), has_need_branch_id_op_(false), has_temp_scan_(false), @@ -560,6 +561,10 @@ public: inline void get_root(const ObOpSpec *&root) const { root = root_op_spec_; } inline void set_scan(bool has_scan) { has_scan_ = has_scan; } inline bool has_scan_op() const { return has_scan_; } + + inline void set_das(bool has_das) { has_das_ = has_das; } + + inline bool has_das_op() const { return has_das_; } inline void set_dml_op(bool has_dml_op) { has_dml_op_ = has_dml_op; } inline bool has_dml_op() { return has_dml_op_; } inline void set_need_branch_id_op(bool has_need_branch_id_op) { has_need_branch_id_op_ = has_need_branch_id_op; } @@ -775,6 +780,7 @@ private: const ObOpSpec *root_op_spec_; common::ObSEArray child_dfos_; bool has_scan_; // DFO 中包含至少一个 scan 算子,或者仅仅包含一个dml + bool has_das_; // DFO 中包含至少一个 das 算子 bool has_dml_op_; // DFO中可能包含一个dml bool has_need_branch_id_op_; // DFO 中有算子需要分配branch_id bool has_temp_scan_; diff --git a/src/sql/engine/px/ob_dfo_mgr.cpp b/src/sql/engine/px/ob_dfo_mgr.cpp index 6630dd427..f1554856b 100644 --- a/src/sql/engine/px/ob_dfo_mgr.cpp +++ b/src/sql/engine/px/ob_dfo_mgr.cpp @@ -489,7 +489,11 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx, ret = OB_ERR_UNEXPECTED; LOG_WARN("the first phy_op must be a coord op", K(ret), K(phy_op->type_)); } else if (phy_op->is_table_scan() && NULL != parent_dfo) { - parent_dfo->set_scan(true); + if (static_cast(phy_op)->use_dist_das()) { + parent_dfo->set_das(true); + } else { + parent_dfo->set_scan(true); + } parent_dfo->inc_tsc_op_cnt(); auto tsc_op = static_cast(phy_op); if (TableAccessType::HAS_USER_TABLE == px_coord_info.table_access_type_){ diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 44ff81dfa..97665dd39 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -1454,8 +1454,10 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx, } LOG_TRACE("alloc_by_data_distribution", K(parent)); } else if (parent.is_single()) { - // parent 可能是一个 scalar group by,会被标记为 is_local,此时 + // 常见于PDML场景,如果parent没有tsc,则中间parent DFO需要把数据从child dfo先拉到QC本地,再shuffle到上面的DFO + // 比如parent 可能是一个 scalar group by,会被标记为 is_local,此时 // 走 alloc_by_data_distribution,内部会分配一个 QC 本地线程来执行 + // 或者嵌套PX场景 if (OB_FAIL(ObPXServerAddrUtil::alloc_by_data_distribution( coord_info_.pruning_table_location_, exec_ctx, parent))) { LOG_WARN("fail alloc addr by data distribution", K(parent), K(ret)); @@ -1476,7 +1478,7 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx, LOG_WARN("alloc by child distribution failed", K(ret)); } } else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) { - LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret)); + LOG_WARN("fail alloc addr by random distribution", K(parent), K(child), K(ret)); } LOG_TRACE("alloc_by_child_distribution", K(child), K(parent)); } diff --git a/src/sql/optimizer/ob_join_order.cpp b/src/sql/optimizer/ob_join_order.cpp index 83146487a..0078caaf3 100644 --- a/src/sql/optimizer/ob_join_order.cpp +++ b/src/sql/optimizer/ob_join_order.cpp @@ -10580,14 +10580,23 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path, distributed_methods = DistAlgo::DIST_RANDOM_ALL; OPT_TRACE("plan will use random all method by hint"); } else { + int enable_px_random_shuffle_only_statistic_exist = (OB_E(EventTable::EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION) OB_SUCCESS); int64_t px_expected_work_count = 0; int64_t compute_parallel = left_path.parallel_; AccessPath *left_access_path = static_cast(&left_path); - const ObTableMetaInfo *table_meta_info = left_access_path->est_cost_info_.table_meta_info_; - LOG_TRACE("SPF random shuffle est table meta info", K(*table_meta_info)); - if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew( - &get_plan()->get_optimizer_context(), left_access_path->get_ref_table_id(), - table_meta_info, compute_parallel, px_expected_work_count))) { + ObTableMetaInfo *table_meta_info = NULL; + if (OB_ISNULL(table_meta_info = left_access_path->est_cost_info_.table_meta_info_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get unexpected null", KPC(table_meta_info), K(ret)); + } else if (OB_SUCC(enable_px_random_shuffle_only_statistic_exist) && + (!table_meta_info->has_opt_stat_ || table_meta_info->micro_block_count_ == 0)) { + // Whether to use PX Random Shuffle is based on statistic infomation, so if we don't have + // it, we just use normal NONE_ALL + distributed_methods &= ~DistAlgo::DIST_RANDOM_ALL; + OPT_TRACE("plan will not use random all because lack of statistic information"); + } else if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew( + &get_plan()->get_optimizer_context(), left_access_path->get_ref_table_id(), + table_meta_info, compute_parallel, px_expected_work_count))) { LOG_WARN("Fail to compute none_all nlj storage compute parallel skew", K(ret)); } else if (px_expected_work_count < compute_parallel) { // we have more compute resources, so we should add a random shuffle, not choose none_all diff --git a/src/sql/optimizer/ob_log_plan.cpp b/src/sql/optimizer/ob_log_plan.cpp index 3d9c99e19..b9f82e6bd 100644 --- a/src/sql/optimizer/ob_log_plan.cpp +++ b/src/sql/optimizer/ob_log_plan.cpp @@ -8227,16 +8227,26 @@ int ObLogPlan::get_subplan_filter_distributed_method(ObLogicalOperator *&top, distributed_methods = DistAlgo::DIST_RANDOM_ALL; OPT_TRACE("SPF will use random all method by hint"); } else { + int enable_px_random_shuffle_only_statistic_exist = (OB_E(EventTable::EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION) OB_SUCCESS); int64_t compute_parallel = top->get_parallel(); - ObLogTableScan *log_table_scan = static_cast(top); int64_t px_expected_work_count = 0; - const ObTableMetaInfo *table_meta_info = - log_table_scan->get_access_path()->est_cost_info_.table_meta_info_; - LOG_TRACE("SPF random shuffle est table meta info", K(*table_meta_info)); - - if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew( - &get_optimizer_context(), log_table_scan->get_ref_table_id(), table_meta_info, - compute_parallel, px_expected_work_count))) { + ObLogTableScan *log_table_scan = static_cast(top); + const AccessPath *ap = NULL; + const ObTableMetaInfo *table_meta_info = NULL; + if (OB_ISNULL(ap = log_table_scan->get_access_path()) + || OB_ISNULL(table_meta_info = ap->est_cost_info_.table_meta_info_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get unexpected null", KPC(ap), KPC(table_meta_info), K(ret)); + } else if (OB_SUCC(enable_px_random_shuffle_only_statistic_exist) && + (!table_meta_info->has_opt_stat_ || table_meta_info->micro_block_count_ == 0)) { + // Whether to use PX Random Shuffle is based on statistic infomation, so if we don't have + // it, we just use normal NONE_ALL + distributed_methods &= ~DIST_HASH_ALL; + distributed_methods &= ~DIST_RANDOM_ALL; + OPT_TRACE("plan will not use random all because lack of statistic information"); + } else if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew( + &get_optimizer_context(), log_table_scan->get_ref_table_id(), table_meta_info, + compute_parallel, px_expected_work_count))) { LOG_WARN("Fail to compute none_all spf storage compute parallel skew", K(ret)); } else if (px_expected_work_count < compute_parallel) { // we have more compute resources, so we should add a hash shuffle diff --git a/src/sql/optimizer/ob_optimizer_util.cpp b/src/sql/optimizer/ob_optimizer_util.cpp index ea6e2eec7..6b220969f 100644 --- a/src/sql/optimizer/ob_optimizer_util.cpp +++ b/src/sql/optimizer/ob_optimizer_util.cpp @@ -10095,36 +10095,45 @@ int ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(ObOptimizerCo int64_t &px_expected_work_count) { int ret = OB_SUCCESS; - - ObSqlSchemaGuard *schema_guard = opt_ctx->get_sql_schema_guard(); - const ObTableSchema *table_schema = NULL; - if (OB_NOT_NULL(schema_guard) - && OB_FAIL(schema_guard->get_table_schema(ref_table_id, table_schema))) { - LOG_WARN("failed to get table schema", K(ret)); - } else if (OB_UNLIKELY(NULL == table_schema)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("failed to get table schame", K(ret)); + if (OB_ISNULL(opt_ctx) || OB_ISNULL(esti_table_meta_info)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("get unexpected null", K(opt_ctx), KPC(esti_table_meta_info), K(ret)); } else { - ObParallelBlockRangeTaskParams params; - params.parallelism_ = compute_parallel; - params.expected_task_load_ = table_schema->get_tablet_size() / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : table_schema->get_tablet_size() / 1024; - //convert from B -> KB - int64_t esti_table_size = (esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024; - double query_range_filter_ratio = esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_; - esti_table_size *= query_range_filter_ratio; - esti_table_size = MAX(0, esti_table_size); - int64_t esti_task_cnt_by_data_size = 0; - if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params, esti_table_size, - esti_task_cnt_by_data_size))) { - LOG_WARN("compute total task count failed", K(ret)); + ObSqlSchemaGuard *schema_guard = opt_ctx->get_sql_schema_guard(); + const ObTableSchema *table_schema = NULL; + if (OB_NOT_NULL(schema_guard) + && OB_FAIL(schema_guard->get_table_schema(ref_table_id, table_schema))) { + LOG_WARN("failed to get table schema", K(ret)); + } else if (OB_UNLIKELY(NULL == table_schema)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("failed to get table schame", K(ret)); } else { - //if table is so small, px still ensures at least one task per partition - if (esti_table_size == 0) { - px_expected_work_count = esti_table_meta_info->part_count_; + ObParallelBlockRangeTaskParams params; + params.parallelism_ = compute_parallel; + params.expected_task_load_ = table_schema->get_tablet_size() / 1024 <= 0 ? + sql::OB_EXPECTED_TASK_LOAD : + table_schema->get_tablet_size() / 1024; + // convert from B -> KB + int64_t esti_table_size = + (esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024; + double query_range_filter_ratio = + esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_; + esti_table_size *= query_range_filter_ratio; + esti_table_size = MAX(0, esti_table_size); + int64_t esti_task_cnt_by_data_size = 0; + if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params, esti_table_size, + esti_task_cnt_by_data_size))) { + LOG_WARN("compute total task count failed", K(ret)); } else { - px_expected_work_count = esti_task_cnt_by_data_size; + // if table is so small, px still ensures at least one task per partition + if (esti_table_size == 0) { + px_expected_work_count = esti_table_meta_info->part_count_; + } else { + px_expected_work_count = esti_task_cnt_by_data_size; + } + LOG_TRACE("OPT: get nlj/spf none_all join bound parallel: ", K(esti_table_size), + K(esti_task_cnt_by_data_size), K(px_expected_work_count), K(compute_parallel)); } - LOG_TRACE("OPT: get nlj/spf none_all join bound parallel: ", K(esti_table_size), K(esti_task_cnt_by_data_size), K(px_expected_work_count), K(compute_parallel)); } }