process failed proxy when dispatching batch sqc failed
This commit is contained in:
@ -1096,24 +1096,19 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx,
|
|||||||
sqc.set_adjoining_root_dfo(true);
|
sqc.set_adjoining_root_dfo(true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 分发 sqc 可能需要重试,
|
// 分发 sqc 可能需要重试,
|
||||||
// 分发 sqc 的 rpc 成功,但 sqc 上无法分配最小个数的 worker 线程,`dispatch_sqc`内部进行重试,
|
// 分发 sqc 的 rpc 成功,但 sqc 上无法分配最小个数的 worker 线程,`dispatch_sqc`内部进行重试,
|
||||||
// 如果多次重试(达到超时时间)都无法成功,不需要再重试整个DFO(因为已经超时)
|
// 如果多次重试(达到超时时间)都无法成功,不需要再重试整个DFO(因为已经超时)
|
||||||
ObPxSqcAsyncProxy proxy(coord_info_.rpc_proxy_, dfo, exec_ctx, phy_plan_ctx, session, phy_plan, sqcs);
|
ObPxSqcAsyncProxy proxy(coord_info_.rpc_proxy_, dfo, exec_ctx, phy_plan_ctx, session, phy_plan, sqcs);
|
||||||
if (OB_FAIL(ret)) {
|
auto process_failed_proxy = [&]() {
|
||||||
} else if (OB_FAIL(proxy.launch_all_rpc_request())) {
|
|
||||||
LOG_WARN("fail to send all init async sqc", K(exec_ctx), K(ret));
|
|
||||||
} else if (OB_FAIL(proxy.wait_all())) {
|
|
||||||
// ret 可是能是 is_data_not_readable_err错误类型,需要通过`deal_with_init_sqc_error`进行处理
|
|
||||||
if (is_data_not_readable_err(ret) || is_server_down_error(ret)) {
|
if (is_data_not_readable_err(ret) || is_server_down_error(ret)) {
|
||||||
ObPxSqcMeta &sqc = *sqcs.at(proxy.get_error_index());
|
ObPxSqcMeta &sqc = *sqcs.at(proxy.get_error_index());
|
||||||
LOG_WARN("fail to wait all async init sqc", K(ret), K(sqc), K(exec_ctx));
|
LOG_WARN("fail to init sqc with proxy", K(ret), K(sqc), K(exec_ctx));
|
||||||
int temp_ret = deal_with_init_sqc_error(exec_ctx, sqc, ret);
|
int temp_ret = deal_with_init_sqc_error(exec_ctx, sqc, ret);
|
||||||
if (temp_ret != OB_SUCCESS) {
|
if (temp_ret != OB_SUCCESS) {
|
||||||
LOG_WARN("fail to deal with init sqc error", K(exec_ctx), K(sqc), K(temp_ret));
|
LOG_WARN("fail to deal with init sqc error", K(exec_ctx), K(sqc), K(temp_ret));
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
LOG_WARN("fail to wait all async init sqc", K(ret), K(exec_ctx));
|
|
||||||
}
|
}
|
||||||
// 对于正确process的sqc, 是需要sqc report的, 否则在后续的wait_running_dfo逻辑中不会等待此sqc结束
|
// 对于正确process的sqc, 是需要sqc report的, 否则在后续的wait_running_dfo逻辑中不会等待此sqc结束
|
||||||
const ObSqcAsyncCB *cb = NULL;
|
const ObSqcAsyncCB *cb = NULL;
|
||||||
@ -1136,6 +1131,15 @@ int ObParallelDfoScheduler::dispatch_sqc(ObExecContext &exec_ctx,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
} else if (OB_FAIL(proxy.launch_all_rpc_request())) {
|
||||||
|
process_failed_proxy();
|
||||||
|
LOG_WARN("fail to send all init async sqc", K(exec_ctx), K(ret));
|
||||||
|
} else if (OB_FAIL(proxy.wait_all())) {
|
||||||
|
// ret 可是能是 is_data_not_readable_err错误类型,需要通过`deal_with_init_sqc_error`进行处理
|
||||||
|
process_failed_proxy();
|
||||||
|
LOG_WARN("fail to wait all async init sqc", K(ret), K(exec_ctx));
|
||||||
} else {
|
} else {
|
||||||
const ObArray<ObSqcAsyncCB *> &callbacks = proxy.get_callbacks();
|
const ObArray<ObSqcAsyncCB *> &callbacks = proxy.get_callbacks();
|
||||||
ARRAY_FOREACH(callbacks, idx) {
|
ARRAY_FOREACH(callbacks, idx) {
|
||||||
|
|||||||
Reference in New Issue
Block a user