[CP] fix wait msg hang because of broadcast msg can not received
This commit is contained in:
@ -183,7 +183,7 @@ int ObPxTransmitChProvider::wait_msg(int64_t timeout_ts)
|
|||||||
if (!msg_set_) {
|
if (!msg_set_) {
|
||||||
ObThreadCondGuard guard(msg_ready_cond_);
|
ObThreadCondGuard guard(msg_ready_cond_);
|
||||||
// wait 1 ms or notified.
|
// wait 1 ms or notified.
|
||||||
int tmp_ret = msg_ready_cond_.wait_us(1000);
|
msg_ready_cond_.wait_us(1000);
|
||||||
|
|
||||||
wait_count++;
|
wait_count++;
|
||||||
// trace log each 100ms
|
// trace log each 100ms
|
||||||
@ -197,7 +197,7 @@ int ObPxTransmitChProvider::wait_msg(int64_t timeout_ts)
|
|||||||
ret = code.code_;
|
ret = code.code_;
|
||||||
LOG_WARN("transmit channel provider wait msg loop is interrupted", K(code), K(ret));
|
LOG_WARN("transmit channel provider wait msg loop is interrupted", K(code), K(ret));
|
||||||
break;
|
break;
|
||||||
} else if (OB_SUCCESS == tmp_ret && !msg_set_) { // wake up by leader, retry
|
} else if (!msg_set_) { // wake up by leader, retry
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
LOG_TRACE("wake up by leader, retry");
|
LOG_TRACE("wake up by leader, retry");
|
||||||
break;
|
break;
|
||||||
@ -375,10 +375,9 @@ int ObPxReceiveChProvider::wait_msg(int64_t child_dfo_id, int64_t timeout_ts)
|
|||||||
}
|
}
|
||||||
if (OB_SUCC(ret)) {
|
if (OB_SUCC(ret)) {
|
||||||
while (!msg_set_[child_dfo_id]) {
|
while (!msg_set_[child_dfo_id]) {
|
||||||
int tmp_ret = OB_SUCCESS;
|
|
||||||
msg_ready_cond_.lock();
|
msg_ready_cond_.lock();
|
||||||
if (!msg_set_[child_dfo_id]) {
|
if (!msg_set_[child_dfo_id]) {
|
||||||
tmp_ret = msg_ready_cond_.wait_us(1 * 1000); /* 1 ms */
|
msg_ready_cond_.wait_us(1 * 1000); /* 1 ms */
|
||||||
}
|
}
|
||||||
msg_ready_cond_.unlock();
|
msg_ready_cond_.unlock();
|
||||||
if (!msg_set_[child_dfo_id]) {
|
if (!msg_set_[child_dfo_id]) {
|
||||||
@ -395,7 +394,7 @@ int ObPxReceiveChProvider::wait_msg(int64_t child_dfo_id, int64_t timeout_ts)
|
|||||||
LOG_WARN("receive channel provider wait msg loop is interrupted",
|
LOG_WARN("receive channel provider wait msg loop is interrupted",
|
||||||
K(child_dfo_id), K(wait_count), K(code), K(ret));
|
K(child_dfo_id), K(wait_count), K(code), K(ret));
|
||||||
break;
|
break;
|
||||||
} else if (OB_SUCCESS == tmp_ret) {
|
} else {
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
LOG_TRACE("follower is wake up by leader, retry");
|
LOG_TRACE("follower is wake up by leader, retry");
|
||||||
break;
|
break;
|
||||||
|
|||||||
Reference in New Issue
Block a user