diff --git a/src/sql/engine/px/ob_px_sqc_proxy.cpp b/src/sql/engine/px/ob_px_sqc_proxy.cpp index a0b72ce08..a981a24d4 100644 --- a/src/sql/engine/px/ob_px_sqc_proxy.cpp +++ b/src/sql/engine/px/ob_px_sqc_proxy.cpp @@ -651,8 +651,9 @@ int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider) { int ret = OB_SUCCESS; const int64_t task_cnt = get_task_count(); + const int64_t curr_rescan_cnt = ATOMIC_LOAD(&provider.rescan_cnt_) + 1; + MEM_BARRIER(); const int64_t idx = ATOMIC_AAF(&provider.dh_msg_cnt_, 1); - const int64_t curr_rescan_cnt = provider.rescan_cnt_ + 1; int64_t loop_cnt = 0; // The whole message should be reset in next rescan, we reset it after last piece msg // firstly do sync wait until all piece threads are in loop @@ -662,11 +663,16 @@ int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider) provider.msg_set_ = false; provider.reset(); // reset whole message ATOMIC_AAF(&provider.rescan_cnt_, 1); + MEM_BARRIER(); ATOMIC_AAF(&provider.dh_msg_cnt_, 1); // to break the loop } else { ob_usleep(1000); if (0 == loop_cnt % 64) { - if (OB_FAIL(THIS_WORKER.check_status())) { + if (OB_UNLIKELY(IS_INTERRUPTED())) { + ObInterruptCode &code = GET_INTERRUPT_CODE(); + ret = code.code_; + LOG_WARN("message loop is interrupted", K(code), K(ret)); + } else if (OB_FAIL(THIS_WORKER.check_status())) { LOG_WARN("failed to sync wait", K(ret), K(task_cnt), K(provider.dh_msg_cnt_)); } }