From b1be3d77c406c38f11d71321b85ed34090950355 Mon Sep 17 00:00:00 2001 From: obdev Date: Fri, 17 Nov 2023 03:11:13 +0000 Subject: [PATCH] Support spf left rows dump and control rpc package size --- .../engine/subquery/ob_subplan_filter_op.cpp | 31 ++++++++++++++----- .../engine/subquery/ob_subplan_filter_op.h | 5 ++- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.cpp b/src/sql/engine/subquery/ob_subplan_filter_op.cpp index b2b92c29ec..ecd81e1e6f 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.cpp +++ b/src/sql/engine/subquery/ob_subplan_filter_op.cpp @@ -606,7 +606,8 @@ int ObSubPlanFilterOp::fill_cur_row_rescan_param() plan_ctx->get_param_store_for_update().at(idx) = params.at(i); } } - OZ(prepare_rescan_params(false)); + int64_t params_size = 0; + OZ(prepare_rescan_params(false, params_size)); return ret; } @@ -706,8 +707,10 @@ int ObSubPlanFilterOp::inner_open() } else if (OB_ISNULL(last_store_row_mem_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("null memory entity returned", K(ret)); - } else if (OB_FAIL(left_rows_.init(UINT64_MAX, tenant_id, ObCtxIds::WORK_AREA))) { + } else if (OB_FAIL(left_rows_.init(MAX_DUMP_SIZE, tenant_id, ObCtxIds::WORK_AREA))) { LOG_WARN("init row store failed", K(ret)); + } else if (OB_FAIL(left_rows_.alloc_dir_id())) { + LOG_WARN("alloc dir id for left rows failed", K(ret)); } else { left_rows_.set_allocator(last_store_row_mem_->get_malloc_allocator()); } @@ -753,6 +756,7 @@ int ObSubPlanFilterOp::inner_get_next_row() int ObSubPlanFilterOp::handle_next_row() { int ret = OB_SUCCESS; + int64_t params_size = 0; if (need_init_before_get_row_) { OZ(prepare_onetime_exprs()); } @@ -817,13 +821,18 @@ int ObSubPlanFilterOp::handle_next_row() } } else if (OB_FAIL(left_rows_.add_row(child_->get_spec().output_, &eval_ctx_))) { LOG_WARN("fail to add row", K(ret)); - } else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true))) { + } else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true, params_size))) { LOG_WARN("fail to prepare rescan params", K(ret)); } else if (MY_SPEC.enable_das_group_rescan_ && OB_FAIL(deep_copy_dynamic_obj())) { LOG_WARN("fail to deep copy dynamic obj", K(ret)); } else { has_row = true; } + + if (enable_left_px_batch_ && params_size >= MAX_PX_RESCAN_PARAMS_SIZE) { + LOG_TRACE("px rescan rpc package is too large", K(params_size), K(PX_RESCAN_BATCH_ROW_COUNT - batch_count)); + break; + } } if (OB_SUCC(ret)) { // back expr datum to last_store_row @@ -871,7 +880,7 @@ int ObSubPlanFilterOp::handle_next_row() if (OB_ITER_END != ret) { LOG_WARN("get next row from child operator failed", K(ret)); } - } else if (OB_FAIL(prepare_rescan_params(false))) { + } else if (OB_FAIL(prepare_rescan_params(false, params_size))) { LOG_WARN("fail to prepare rescan params", K(ret)); } @@ -894,6 +903,7 @@ int ObSubPlanFilterOp::handle_next_batch_with_px_rescan(const int64_t op_max_bat bool stop_fetch = false; ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_); uint64_t left_rows_total_cnt = 0; + int64_t params_size = 0; if (left_rows_iter_.is_valid() && left_rows_iter_.has_next()) { // fetch data from left store } else { @@ -921,11 +931,16 @@ int ObSubPlanFilterOp::handle_next_batch_with_px_rescan(const int64_t op_max_bat for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_; l_idx++) { if (child_brs->skip_->exist(l_idx)) { continue; } guard.set_batch_idx(l_idx); - if (OB_FAIL(prepare_rescan_params(true))) { + if (OB_FAIL(prepare_rescan_params(true, params_size))) { LOG_WARN("prepare rescan params failed", K(ret)); } } } + + if (params_size >= MAX_PX_RESCAN_PARAMS_SIZE) { + LOG_TRACE("px rescan rpc package is too large", K(params_size), K(left_rows_total_cnt)); + break; + } } if (OB_SUCC(ret)) { if (!child_brs->end_) { @@ -1153,6 +1168,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt) { int ret = OB_SUCCESS; int64_t op_max_batch_size = min(max_row_cnt, MY_SPEC.max_batch_size_); + int64_t params_size = 0; if (need_init_before_get_row_) { OZ(prepare_onetime_exprs()); } @@ -1191,7 +1207,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt) for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_; l_idx++) { if (child_brs->skip_->exist(l_idx)) { continue; } guard.set_batch_idx(l_idx); - if (OB_FAIL(prepare_rescan_params(false))) { + if (OB_FAIL(prepare_rescan_params(false, params_size))) { LOG_WARN("prepare rescan params failed", K(ret)); } else { if (need_init_before_get_row_) { @@ -1238,7 +1254,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt) return ret; } -int ObSubPlanFilterOp::prepare_rescan_params(bool need_save) +int ObSubPlanFilterOp::prepare_rescan_params(bool need_save, int64_t& params_size) { int ret = OB_SUCCESS; ObObjParam *param = NULL; @@ -1258,6 +1274,7 @@ int ObSubPlanFilterOp::prepare_rescan_params(bool need_save) int64_t expr_idx = 0; LOG_DEBUG("prepare_rescan_params", KPC(param), K(i)); OZ(batch_rescan_ctl_.params_.deep_copy_param(*param, copy_result)); + params_size += copy_result.get_deep_copy_size(); OZ(cur_params_.push_back(copy_result)); OZ(cur_param_idxs_.push_back(MY_SPEC.rescan_params_.at(i).param_idx_)); CK(OB_NOT_NULL(plan_ctx->get_phy_plan())); diff --git a/src/sql/engine/subquery/ob_subplan_filter_op.h b/src/sql/engine/subquery/ob_subplan_filter_op.h index 0c2989ed2d..1d5dae24ce 100644 --- a/src/sql/engine/subquery/ob_subplan_filter_op.h +++ b/src/sql/engine/subquery/ob_subplan_filter_op.h @@ -239,7 +239,7 @@ private: } } - int prepare_rescan_params(bool save); + int prepare_rescan_params(bool save, int64_t ¶ms_size); int prepare_onetime_exprs(); int prepare_onetime_exprs_inner(); int handle_update_set(); @@ -282,6 +282,9 @@ private: common::ObSEArray subplan_iters_to_check_; lib::MemoryContext last_store_row_mem_; ObBatchResultHolder brs_holder_; +public: + static const int64_t MAX_PX_RESCAN_PARAMS_SIZE = 4 << 20; // 4M + static const int64_t MAX_DUMP_SIZE = 16 << 20; // 16M }; class GroupParamBackupGuard