Fix PX Random Shuffle memory explosion issue

This commit is contained in:
obdev 2024-11-08 09:44:22 +00:00 committed by ob-robot
parent 746591ca25
commit e151889100
7 changed files with 83 additions and 42 deletions

View File

@ -376,6 +376,7 @@ GLOBAL_ERRSIM_POINT_DEF(556, EN_GENERATE_RANDOM_PLAN, "Whether the optimizer gen
GLOBAL_ERRSIM_POINT_DEF(557, EN_COALESCE_AGGR_IGNORE_COST, "");
GLOBAL_ERRSIM_POINT_DEF(558, EN_CHECK_REWRITE_ITER_CONVERGE, "Reporting error when rewrite iter nonconvergent");
GLOBAL_ERRSIM_POINT_DEF(559, EN_PRINT_CONSTRAINTS_INFO, "show constraints info when explain query plan");
GLOBAL_ERRSIM_POINT_DEF(560, EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION, "Use px random shuffle even statistical information do not exist.");
// 600-700 For PX use
GLOBAL_ERRSIM_POINT_DEF(600, EN_PX_SQC_EXECUTE_FAILED, "");

View File

@ -489,6 +489,7 @@ public:
root_op_spec_(nullptr),
child_dfos_(),
has_scan_(false),
has_das_(false),
has_dml_op_(false),
has_need_branch_id_op_(false),
has_temp_scan_(false),
@ -560,6 +561,10 @@ public:
inline void get_root(const ObOpSpec *&root) const { root = root_op_spec_; }
inline void set_scan(bool has_scan) { has_scan_ = has_scan; }
inline bool has_scan_op() const { return has_scan_; }
inline void set_das(bool has_das) { has_das_ = has_das; }
inline bool has_das_op() const { return has_das_; }
inline void set_dml_op(bool has_dml_op) { has_dml_op_ = has_dml_op; }
inline bool has_dml_op() { return has_dml_op_; }
inline void set_need_branch_id_op(bool has_need_branch_id_op) { has_need_branch_id_op_ = has_need_branch_id_op; }
@ -775,6 +780,7 @@ private:
const ObOpSpec *root_op_spec_;
common::ObSEArray<ObDfo *, 4> child_dfos_;
bool has_scan_; // DFO 中包含至少一个 scan 算子,或者仅仅包含一个dml
bool has_das_; // DFO 中包含至少一个 das 算子
bool has_dml_op_; // DFO中可能包含一个dml
bool has_need_branch_id_op_; // DFO 中有算子需要分配branch_id
bool has_temp_scan_;

View File

@ -489,7 +489,11 @@ int ObDfoMgr::do_split(ObExecContext &exec_ctx,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("the first phy_op must be a coord op", K(ret), K(phy_op->type_));
} else if (phy_op->is_table_scan() && NULL != parent_dfo) {
parent_dfo->set_scan(true);
if (static_cast<const ObTableScanSpec*>(phy_op)->use_dist_das()) {
parent_dfo->set_das(true);
} else {
parent_dfo->set_scan(true);
}
parent_dfo->inc_tsc_op_cnt();
auto tsc_op = static_cast<const ObTableScanSpec *>(phy_op);
if (TableAccessType::HAS_USER_TABLE == px_coord_info.table_access_type_){

View File

@ -1454,8 +1454,10 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
}
LOG_TRACE("alloc_by_data_distribution", K(parent));
} else if (parent.is_single()) {
// parent 可能是一个 scalar group by,会被标记为 is_local,此时
// 常见于PDML场景,如果parent没有tsc,则中间parent DFO需要把数据从child dfo先拉到QC本地,再shuffle到上面的DFO
// 比如parent 可能是一个 scalar group by,会被标记为 is_local,此时
// 走 alloc_by_data_distribution,内部会分配一个 QC 本地线程来执行
// 或者嵌套PX场景
if (OB_FAIL(ObPXServerAddrUtil::alloc_by_data_distribution(
coord_info_.pruning_table_location_, exec_ctx, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(ret));
@ -1476,7 +1478,7 @@ int ObParallelDfoScheduler::schedule_pair(ObExecContext &exec_ctx,
LOG_WARN("alloc by child distribution failed", K(ret));
}
} else if (OB_FAIL(ObPXServerAddrUtil::alloc_by_random_distribution(exec_ctx, child, parent))) {
LOG_WARN("fail alloc addr by data distribution", K(parent), K(child), K(ret));
LOG_WARN("fail alloc addr by random distribution", K(parent), K(child), K(ret));
}
LOG_TRACE("alloc_by_child_distribution", K(child), K(parent));
}

View File

@ -10580,14 +10580,23 @@ int ObJoinOrder::get_distributed_join_method(Path &left_path,
distributed_methods = DistAlgo::DIST_RANDOM_ALL;
OPT_TRACE("plan will use random all method by hint");
} else {
int enable_px_random_shuffle_only_statistic_exist = (OB_E(EventTable::EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION) OB_SUCCESS);
int64_t px_expected_work_count = 0;
int64_t compute_parallel = left_path.parallel_;
AccessPath *left_access_path = static_cast<AccessPath *>(&left_path);
const ObTableMetaInfo *table_meta_info = left_access_path->est_cost_info_.table_meta_info_;
LOG_TRACE("SPF random shuffle est table meta info", K(*table_meta_info));
if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(
&get_plan()->get_optimizer_context(), left_access_path->get_ref_table_id(),
table_meta_info, compute_parallel, px_expected_work_count))) {
ObTableMetaInfo *table_meta_info = NULL;
if (OB_ISNULL(table_meta_info = left_access_path->est_cost_info_.table_meta_info_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get unexpected null", KPC(table_meta_info), K(ret));
} else if (OB_SUCC(enable_px_random_shuffle_only_statistic_exist) &&
(!table_meta_info->has_opt_stat_ || table_meta_info->micro_block_count_ == 0)) {
// Whether to use PX Random Shuffle is based on statistic infomation, so if we don't have
// it, we just use normal NONE_ALL
distributed_methods &= ~DistAlgo::DIST_RANDOM_ALL;
OPT_TRACE("plan will not use random all because lack of statistic information");
} else if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(
&get_plan()->get_optimizer_context(), left_access_path->get_ref_table_id(),
table_meta_info, compute_parallel, px_expected_work_count))) {
LOG_WARN("Fail to compute none_all nlj storage compute parallel skew", K(ret));
} else if (px_expected_work_count < compute_parallel) {
// we have more compute resources, so we should add a random shuffle, not choose none_all

View File

@ -8227,16 +8227,26 @@ int ObLogPlan::get_subplan_filter_distributed_method(ObLogicalOperator *&top,
distributed_methods = DistAlgo::DIST_RANDOM_ALL;
OPT_TRACE("SPF will use random all method by hint");
} else {
int enable_px_random_shuffle_only_statistic_exist = (OB_E(EventTable::EN_PX_RANDOM_SHUFFLE_WITHOUT_STATISTIC_INFORMATION) OB_SUCCESS);
int64_t compute_parallel = top->get_parallel();
ObLogTableScan *log_table_scan = static_cast<ObLogTableScan *>(top);
int64_t px_expected_work_count = 0;
const ObTableMetaInfo *table_meta_info =
log_table_scan->get_access_path()->est_cost_info_.table_meta_info_;
LOG_TRACE("SPF random shuffle est table meta info", K(*table_meta_info));
if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(
&get_optimizer_context(), log_table_scan->get_ref_table_id(), table_meta_info,
compute_parallel, px_expected_work_count))) {
ObLogTableScan *log_table_scan = static_cast<ObLogTableScan *>(top);
const AccessPath *ap = NULL;
const ObTableMetaInfo *table_meta_info = NULL;
if (OB_ISNULL(ap = log_table_scan->get_access_path())
|| OB_ISNULL(table_meta_info = ap->est_cost_info_.table_meta_info_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get unexpected null", KPC(ap), KPC(table_meta_info), K(ret));
} else if (OB_SUCC(enable_px_random_shuffle_only_statistic_exist) &&
(!table_meta_info->has_opt_stat_ || table_meta_info->micro_block_count_ == 0)) {
// Whether to use PX Random Shuffle is based on statistic infomation, so if we don't have
// it, we just use normal NONE_ALL
distributed_methods &= ~DIST_HASH_ALL;
distributed_methods &= ~DIST_RANDOM_ALL;
OPT_TRACE("plan will not use random all because lack of statistic information");
} else if (OB_FAIL(ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(
&get_optimizer_context(), log_table_scan->get_ref_table_id(), table_meta_info,
compute_parallel, px_expected_work_count))) {
LOG_WARN("Fail to compute none_all spf storage compute parallel skew", K(ret));
} else if (px_expected_work_count < compute_parallel) {
// we have more compute resources, so we should add a hash shuffle

View File

@ -10095,36 +10095,45 @@ int ObOptimizerUtil::compute_nlj_spf_storage_compute_parallel_skew(ObOptimizerCo
int64_t &px_expected_work_count)
{
int ret = OB_SUCCESS;
ObSqlSchemaGuard *schema_guard = opt_ctx->get_sql_schema_guard();
const ObTableSchema *table_schema = NULL;
if (OB_NOT_NULL(schema_guard)
&& OB_FAIL(schema_guard->get_table_schema(ref_table_id, table_schema))) {
LOG_WARN("failed to get table schema", K(ret));
} else if (OB_UNLIKELY(NULL == table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get table schame", K(ret));
if (OB_ISNULL(opt_ctx) || OB_ISNULL(esti_table_meta_info)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("get unexpected null", K(opt_ctx), KPC(esti_table_meta_info), K(ret));
} else {
ObParallelBlockRangeTaskParams params;
params.parallelism_ = compute_parallel;
params.expected_task_load_ = table_schema->get_tablet_size() / 1024 <= 0 ? sql::OB_EXPECTED_TASK_LOAD : table_schema->get_tablet_size() / 1024;
//convert from B -> KB
int64_t esti_table_size = (esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024;
double query_range_filter_ratio = esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_;
esti_table_size *= query_range_filter_ratio;
esti_table_size = MAX(0, esti_table_size);
int64_t esti_task_cnt_by_data_size = 0;
if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params, esti_table_size,
esti_task_cnt_by_data_size))) {
LOG_WARN("compute total task count failed", K(ret));
ObSqlSchemaGuard *schema_guard = opt_ctx->get_sql_schema_guard();
const ObTableSchema *table_schema = NULL;
if (OB_NOT_NULL(schema_guard)
&& OB_FAIL(schema_guard->get_table_schema(ref_table_id, table_schema))) {
LOG_WARN("failed to get table schema", K(ret));
} else if (OB_UNLIKELY(NULL == table_schema)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("failed to get table schame", K(ret));
} else {
//if table is so small, px still ensures at least one task per partition
if (esti_table_size == 0) {
px_expected_work_count = esti_table_meta_info->part_count_;
ObParallelBlockRangeTaskParams params;
params.parallelism_ = compute_parallel;
params.expected_task_load_ = table_schema->get_tablet_size() / 1024 <= 0 ?
sql::OB_EXPECTED_TASK_LOAD :
table_schema->get_tablet_size() / 1024;
// convert from B -> KB
int64_t esti_table_size =
(esti_table_meta_info->micro_block_count_ * esti_table_meta_info->micro_block_size_) / 1024;
double query_range_filter_ratio =
esti_table_meta_info->row_count_ / esti_table_meta_info->table_row_count_;
esti_table_size *= query_range_filter_ratio;
esti_table_size = MAX(0, esti_table_size);
int64_t esti_task_cnt_by_data_size = 0;
if (OB_FAIL(ObGranuleUtil::compute_total_task_count(params, esti_table_size,
esti_task_cnt_by_data_size))) {
LOG_WARN("compute total task count failed", K(ret));
} else {
px_expected_work_count = esti_task_cnt_by_data_size;
// if table is so small, px still ensures at least one task per partition
if (esti_table_size == 0) {
px_expected_work_count = esti_table_meta_info->part_count_;
} else {
px_expected_work_count = esti_task_cnt_by_data_size;
}
LOG_TRACE("OPT: get nlj/spf none_all join bound parallel: ", K(esti_table_size),
K(esti_task_cnt_by_data_size), K(px_expected_work_count), K(compute_parallel));
}
LOG_TRACE("OPT: get nlj/spf none_all join bound parallel: ", K(esti_table_size), K(esti_task_cnt_by_data_size), K(px_expected_work_count), K(compute_parallel));
}
}