Support spf left rows dump and control rpc package size

This commit is contained in:
obdev
2023-11-17 03:11:13 +00:00
committed by ob-robot
parent 761938037c
commit b1be3d77c4
2 changed files with 28 additions and 8 deletions

View File

@ -606,7 +606,8 @@ int ObSubPlanFilterOp::fill_cur_row_rescan_param()
plan_ctx->get_param_store_for_update().at(idx) = params.at(i);
}
}
OZ(prepare_rescan_params(false));
int64_t params_size = 0;
OZ(prepare_rescan_params(false, params_size));
return ret;
}
@ -706,8 +707,10 @@ int ObSubPlanFilterOp::inner_open()
} else if (OB_ISNULL(last_store_row_mem_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null memory entity returned", K(ret));
} else if (OB_FAIL(left_rows_.init(UINT64_MAX, tenant_id, ObCtxIds::WORK_AREA))) {
} else if (OB_FAIL(left_rows_.init(MAX_DUMP_SIZE, tenant_id, ObCtxIds::WORK_AREA))) {
LOG_WARN("init row store failed", K(ret));
} else if (OB_FAIL(left_rows_.alloc_dir_id())) {
LOG_WARN("alloc dir id for left rows failed", K(ret));
} else {
left_rows_.set_allocator(last_store_row_mem_->get_malloc_allocator());
}
@ -753,6 +756,7 @@ int ObSubPlanFilterOp::inner_get_next_row()
int ObSubPlanFilterOp::handle_next_row()
{
int ret = OB_SUCCESS;
int64_t params_size = 0;
if (need_init_before_get_row_) {
OZ(prepare_onetime_exprs());
}
@ -817,13 +821,18 @@ int ObSubPlanFilterOp::handle_next_row()
}
} else if (OB_FAIL(left_rows_.add_row(child_->get_spec().output_, &eval_ctx_))) {
LOG_WARN("fail to add row", K(ret));
} else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true))) {
} else if (enable_left_px_batch_ && OB_FAIL(prepare_rescan_params(true, params_size))) {
LOG_WARN("fail to prepare rescan params", K(ret));
} else if (MY_SPEC.enable_das_group_rescan_ && OB_FAIL(deep_copy_dynamic_obj())) {
LOG_WARN("fail to deep copy dynamic obj", K(ret));
} else {
has_row = true;
}
if (enable_left_px_batch_ && params_size >= MAX_PX_RESCAN_PARAMS_SIZE) {
LOG_TRACE("px rescan rpc package is too large", K(params_size), K(PX_RESCAN_BATCH_ROW_COUNT - batch_count));
break;
}
}
if (OB_SUCC(ret)) {
// back expr datum to last_store_row
@ -871,7 +880,7 @@ int ObSubPlanFilterOp::handle_next_row()
if (OB_ITER_END != ret) {
LOG_WARN("get next row from child operator failed", K(ret));
}
} else if (OB_FAIL(prepare_rescan_params(false))) {
} else if (OB_FAIL(prepare_rescan_params(false, params_size))) {
LOG_WARN("fail to prepare rescan params", K(ret));
}
@ -894,6 +903,7 @@ int ObSubPlanFilterOp::handle_next_batch_with_px_rescan(const int64_t op_max_bat
bool stop_fetch = false;
ObEvalCtx::BatchInfoScopeGuard guard(eval_ctx_);
uint64_t left_rows_total_cnt = 0;
int64_t params_size = 0;
if (left_rows_iter_.is_valid() && left_rows_iter_.has_next()) {
// fetch data from left store
} else {
@ -921,11 +931,16 @@ int ObSubPlanFilterOp::handle_next_batch_with_px_rescan(const int64_t op_max_bat
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_; l_idx++) {
if (child_brs->skip_->exist(l_idx)) { continue; }
guard.set_batch_idx(l_idx);
if (OB_FAIL(prepare_rescan_params(true))) {
if (OB_FAIL(prepare_rescan_params(true, params_size))) {
LOG_WARN("prepare rescan params failed", K(ret));
}
}
}
if (params_size >= MAX_PX_RESCAN_PARAMS_SIZE) {
LOG_TRACE("px rescan rpc package is too large", K(params_size), K(left_rows_total_cnt));
break;
}
}
if (OB_SUCC(ret)) {
if (!child_brs->end_) {
@ -1153,6 +1168,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
{
int ret = OB_SUCCESS;
int64_t op_max_batch_size = min(max_row_cnt, MY_SPEC.max_batch_size_);
int64_t params_size = 0;
if (need_init_before_get_row_) {
OZ(prepare_onetime_exprs());
}
@ -1191,7 +1207,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
for (int64_t l_idx = 0; OB_SUCC(ret) && l_idx < child_brs->size_; l_idx++) {
if (child_brs->skip_->exist(l_idx)) { continue; }
guard.set_batch_idx(l_idx);
if (OB_FAIL(prepare_rescan_params(false))) {
if (OB_FAIL(prepare_rescan_params(false, params_size))) {
LOG_WARN("prepare rescan params failed", K(ret));
} else {
if (need_init_before_get_row_) {
@ -1238,7 +1254,7 @@ int ObSubPlanFilterOp::inner_get_next_batch(const int64_t max_row_cnt)
return ret;
}
int ObSubPlanFilterOp::prepare_rescan_params(bool need_save)
int ObSubPlanFilterOp::prepare_rescan_params(bool need_save, int64_t& params_size)
{
int ret = OB_SUCCESS;
ObObjParam *param = NULL;
@ -1258,6 +1274,7 @@ int ObSubPlanFilterOp::prepare_rescan_params(bool need_save)
int64_t expr_idx = 0;
LOG_DEBUG("prepare_rescan_params", KPC(param), K(i));
OZ(batch_rescan_ctl_.params_.deep_copy_param(*param, copy_result));
params_size += copy_result.get_deep_copy_size();
OZ(cur_params_.push_back(copy_result));
OZ(cur_param_idxs_.push_back(MY_SPEC.rescan_params_.at(i).param_idx_));
CK(OB_NOT_NULL(plan_ctx->get_phy_plan()));

View File

@ -239,7 +239,7 @@ private:
}
}
int prepare_rescan_params(bool save);
int prepare_rescan_params(bool save, int64_t &params_size);
int prepare_onetime_exprs();
int prepare_onetime_exprs_inner();
int handle_update_set();
@ -282,6 +282,9 @@ private:
common::ObSEArray<Iterator*, 8> subplan_iters_to_check_;
lib::MemoryContext last_store_row_mem_;
ObBatchResultHolder brs_holder_;
public:
static const int64_t MAX_PX_RESCAN_PARAMS_SIZE = 4 << 20; // 4M
static const int64_t MAX_DUMP_SIZE = 16 << 20; // 16M
};
class GroupParamBackupGuard