Remove Rpc Worker Flag In Px

This commit is contained in:
qianchanger
2023-02-06 23:49:45 +08:00
committed by ob-robot
parent 9b302b8660
commit f6a68bca07
7 changed files with 15 additions and 44 deletions

View File

@ -171,8 +171,7 @@ int ObPxSubCoord::init_exec_env(ObExecContext &exec_ctx)
} else if (OB_ISNULL(plan_ctx = GET_PHY_PLAN_CTX(exec_ctx))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("deserialized exec ctx without phy plan ctx set. Unexpected", K(ret));
} else if (OB_FAIL(init_first_buffer_cache(sqc_arg_.sqc_.is_rpc_worker(),
sqc_arg_.des_phy_plan_->get_px_dop()))) {
} else if (OB_FAIL(init_first_buffer_cache(sqc_arg_.des_phy_plan_->get_px_dop()))) {
LOG_WARN("failed to init first buffer cache", K(ret));
} else {
session->set_cur_phy_plan(sqc_arg_.des_phy_plan_);
@ -561,11 +560,10 @@ int ObPxSubCoord::dispatch_tasks(ObPxRpcInitSqcArgs &sqc_arg, ObSqcCtx &sqc_ctx,
if (OB_ISNULL(sqc_arg.exec_ctx_)
|| OB_ISNULL(sqc_arg.sqc_handler_)
|| OB_ISNULL(sqc_arg.op_spec_root_)
|| OB_ISNULL(sqc_arg.des_phy_plan_)
|| (sqc.is_rpc_worker() && 1 != sqc.get_task_count())) {
|| OB_ISNULL(sqc_arg.des_phy_plan_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Invalid sqc args", K(ret), K(sqc));
} else if (sqc.is_rpc_worker() || is_fast_sqc) {
} else if (is_fast_sqc) {
dispatch_worker_count = 0;
ret = dispatch_task_to_local_thread(sqc_arg, sqc_ctx, sqc);
} else {
@ -745,10 +743,10 @@ int ObPxSubCoord::end_process()
return ret;
}
int ObPxSubCoord::init_first_buffer_cache(bool is_rpc_worker, int64_t px_dop)
int ObPxSubCoord::init_first_buffer_cache(int64_t px_dop)
{
int ret = OB_SUCCESS;
int64_t dop = is_rpc_worker ? 1 * px_dop : px_dop * px_dop;
int64_t dop = px_dop * px_dop;
if (OB_ISNULL(sqc_arg_.exec_ctx_) || OB_ISNULL(sqc_arg_.exec_ctx_->get_my_session())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("null unexpected", K(ret));