fix bug: unexpected error due to sqc finish msg processed twice

This commit is contained in:
obdev
2024-02-08 05:20:47 +00:00
committed by ob-robot
parent f4cc8aef80
commit dc6c816851
2 changed files with 35 additions and 12 deletions

View File

@ -279,6 +279,29 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
int ret = OB_SUCCESS;
ObDfo *edge = NULL;
ObPxSqcMeta *sqc = NULL;
if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) {
LOG_WARN("fail find dfo", K(pkt), K(ret));
} else if (OB_ISNULL(edge)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(pkt), K(ret));
} else if (OB_FAIL(edge->get_sqc(pkt.sqc_id_, sqc))) {
LOG_WARN("fail find sqc", K(pkt), K(ret));
} else if (OB_ISNULL(sqc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", K(pkt), K(ret));
} else if (sqc->is_thread_finish()) {
// For virtual tables, if both the mocked SQC finish message and the real SQC finish message are
// processed by the QC, we should skip the processing of the finish message that arrives later.
} else if (OB_FAIL(process_sqc_finish_msg_once(ctx, pkt, sqc, edge))) {
LOG_WARN("failed to process_sqc_finish_msg_once");
}
return ret;
}
int ObPxMsgProc::process_sqc_finish_msg_once(ObExecContext &ctx, const ObPxFinishSqcResultMsg &pkt,
ObPxSqcMeta *sqc, ObDfo *edge)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session = NULL;
ObPhysicalPlanCtx *phy_plan_ctx = NULL;
if (OB_ISNULL(session = ctx.get_my_session())) {
@ -302,22 +325,18 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
"tx_desc", *session->get_tx_desc());
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(coord_info_.dfo_mgr_.find_dfo_edge(pkt.dfo_id_, edge))) {
LOG_WARN("fail find dfo", K(pkt), K(ret));
} else if (OB_FAIL(edge->get_sqc(pkt.sqc_id_, sqc))) {
LOG_WARN("fail find sqc", K(pkt), K(ret));
} else if (OB_ISNULL(edge) || OB_ISNULL(sqc)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("NULL ptr", KP(edge), KP(sqc), K(ret));
} else if (FALSE_IT(sqc->set_need_report(false))) {
} else if (common::OB_INVALID_ID != pkt.temp_table_id_) {
if (OB_FAIL(ctx.add_temp_table_interm_result_ids(pkt.temp_table_id_,
sqc->get_exec_addr(),
pkt.interm_result_ids_))) {
LOG_WARN("failed to add temp table interm result ids.", K(ret));
} else { /*do nothing.*/ }
} else { /*do nothing.*/ }
}
}
if (OB_SUCC(ret)) {
sqc->set_need_report(false);
sqc->set_thread_finish(true);
// process for DM, mark sqc finished, then DM will not detect this sqc again.
if (OB_NOT_NULL(edge->get_detect_cb())) {
#ifdef ERRSIM
if (OB_FAIL(OB_E(EventTable::EN_PX_SLOW_PROCESS_SQC_FINISH_MSG) OB_SUCCESS)) {
@ -331,7 +350,7 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
LOG_WARN("[DM] failed to atomic_set_finished", K(set_finish_ret), K(sqc->get_sqc_addr()));
}
}
sqc->set_thread_finish(true);
// process for virtual table, mock eof buffer let px exit msg loop
if (sqc->is_ignore_vtable_error() && OB_SUCCESS != pkt.rc_
&& ObVirtualTableErrorWhitelist::should_ignore_vtable_error(pkt.rc_)) {
// 如果收到一个sqc finish消息, 如果该sqc涉及虚拟表, 需要忽略所有错误码
@ -350,7 +369,7 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
LOG_TRACE("on_sqc_finish_msg update feedback info",
K(pkt.fb_info_), K(ctx.get_feedback_info()));
}
// mark dfo finished if all sqcs in this dfo is finished
if (OB_SUCC(ret)) {
ObArray<ObPxSqcMeta *> sqcs;
if (OB_FAIL(edge->get_sqcs(sqcs))) {
@ -403,6 +422,7 @@ int ObPxMsgProc::on_sqc_finish_msg(ObExecContext &ctx,
}
}
// schedule_next_dfo if this dfo is finished.
if (OB_SUCC(ret)) {
if (edge->is_thread_finish()) {
if (OB_NOT_NULL(ctx.get_physical_plan_ctx()->get_phy_plan()) &&

View File

@ -251,6 +251,9 @@ private:
ObArray<ObPxSqcMeta *> &sqcs);
int wait_for_dfo_finish(ObDfoMgr &dfo_mgr);
int process_sqc_finish_msg_once(ObExecContext &ctx, const ObPxFinishSqcResultMsg &pkt,
ObPxSqcMeta *sqc, ObDfo *edge);
private:
ObPxCoordInfo &coord_info_;
ObIPxCoordEventListener &listener_;