fix get_dh_msg hang because sync_wait_all
This commit is contained in:
parent
fe7d87dfc1
commit
517bc412b9
@ -651,8 +651,9 @@ int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t task_cnt = get_task_count();
|
||||
const int64_t curr_rescan_cnt = ATOMIC_LOAD(&provider.rescan_cnt_) + 1;
|
||||
MEM_BARRIER();
|
||||
const int64_t idx = ATOMIC_AAF(&provider.dh_msg_cnt_, 1);
|
||||
const int64_t curr_rescan_cnt = provider.rescan_cnt_ + 1;
|
||||
int64_t loop_cnt = 0;
|
||||
// The whole message should be reset in next rescan, we reset it after last piece msg
|
||||
// firstly do sync wait until all piece threads are in loop
|
||||
@ -662,11 +663,16 @@ int ObPxSQCProxy::sync_wait_all(ObPxDatahubDataProvider &provider)
|
||||
provider.msg_set_ = false;
|
||||
provider.reset(); // reset whole message
|
||||
ATOMIC_AAF(&provider.rescan_cnt_, 1);
|
||||
MEM_BARRIER();
|
||||
ATOMIC_AAF(&provider.dh_msg_cnt_, 1); // to break the loop
|
||||
} else {
|
||||
ob_usleep(1000);
|
||||
if (0 == loop_cnt % 64) {
|
||||
if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
if (OB_UNLIKELY(IS_INTERRUPTED())) {
|
||||
ObInterruptCode &code = GET_INTERRUPT_CODE();
|
||||
ret = code.code_;
|
||||
LOG_WARN("message loop is interrupted", K(code), K(ret));
|
||||
} else if (OB_FAIL(THIS_WORKER.check_status())) {
|
||||
LOG_WARN("failed to sync wait", K(ret), K(task_cnt), K(provider.dh_msg_cnt_));
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user