diff --git a/src/sql/engine/px/ob_dfo_scheduler.cpp b/src/sql/engine/px/ob_dfo_scheduler.cpp index 35d3b386d5..4133a9526b 100644 --- a/src/sql/engine/px/ob_dfo_scheduler.cpp +++ b/src/sql/engine/px/ob_dfo_scheduler.cpp @@ -1096,24 +1096,19 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx, sqc.set_adjoining_root_dfo(true); } } + // 分发 sqc 可能需要重试, // 分发 sqc 的 rpc 成功,但 sqc 上无法分配最小个数的 worker 线程,`dispatch_sqc`内部进行重试, // 如果多次重试(达到超时时间)都无法成功,不需要再重试整个DFO(因为已经超时) ObPxSqcAsyncProxy proxy(coord_info_.rpc_proxy_, dfo, exec_ctx, phy_plan_ctx, session, phy_plan, sqcs); - if (OB_FAIL(ret)) { - } else if (OB_FAIL(proxy.launch_all_rpc_request())) { - LOG_WARN("fail to send all init async sqc", K(exec_ctx), K(ret)); - } else if (OB_FAIL(proxy.wait_all())) { - // ret 可是能是 is_data_not_readable_err错误类型,需要通过`deal_with_init_sqc_error`进行处理 + auto process_failed_proxy = [&]() { if (is_data_not_readable_err(ret) || is_server_down_error(ret)) { ObPxSqcMeta &sqc = *sqcs.at(proxy.get_error_index()); - LOG_WARN("fail to wait all async init sqc", K(ret), K(sqc), K(exec_ctx)); + LOG_WARN("fail to init sqc with proxy", K(ret), K(sqc), K(exec_ctx)); int temp_ret = deal_with_init_sqc_error(exec_ctx, sqc, ret); if (temp_ret != OB_SUCCESS) { LOG_WARN("fail to deal with init sqc error", K(exec_ctx), K(sqc), K(temp_ret)); } - } else { - LOG_WARN("fail to wait all async init sqc", K(ret), K(exec_ctx)); } // 对于正确process的sqc, 是需要sqc report的, 否则在后续的wait_running_dfo逻辑中不会等待此sqc结束 const ObSqcAsyncCB *cb = NULL; @@ -1136,6 +1131,15 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx, } } } + }; + if (OB_FAIL(ret)) { + } else if (OB_FAIL(proxy.launch_all_rpc_request())) { + process_failed_proxy(); + LOG_WARN("fail to send all init async sqc", K(exec_ctx), K(ret)); + } else if (OB_FAIL(proxy.wait_all())) { + // ret 可是能是 is_data_not_readable_err错误类型,需要通过`deal_with_init_sqc_error`进行处理 + process_failed_proxy(); + LOG_WARN("fail to wait all async init sqc", K(ret), K(exec_ctx)); } else { const ObArray &callbacks = proxy.get_callbacks(); ARRAY_FOREACH(callbacks, idx) {