Solve the problem of read-write conflict of the msg_set structure.

This commit is contained in:
qingsuijiu 2024-05-22 13:59:50 +00:00 committed by ob-robot
parent 19466707ca
commit f6820b429e
2 changed files with 9 additions and 4 deletions

View File

@ -270,7 +270,7 @@ int ObPxReceiveChProvider::get_data_ch_nonblock(
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid msg", K(child_dfo_id), K(ret));
} else if (OB_FAIL(ObPxChProviderUtil::check_status(timeout_ts, qc_addr, query_start_time))) {
// nop
LOG_WARN("Fail to check status", K(child_dfo_id), K(msg_set_), K(msgs_), K(ret));
} else {
ObLockGuard<ObSpinLock> lock_guard(lock_);
ARRAY_FOREACH_X(msgs_, idx, cnt, OB_SUCC(ret) && !found) {
@ -329,7 +329,7 @@ int ObPxReceiveChProvider::get_data_ch(
}
if (OB_SUCC(ret) && !found) {
ret = OB_ENTRY_NOT_EXIST;
LOG_WARN("no receive ch found for dfo", K(child_dfo_id), K(ret));
LOG_WARN("no receive ch found for dfo", K(child_dfo_id), K(msg_set_), K(msgs_), K(ret));
}
}
return ret;
@ -374,7 +374,7 @@ int ObPxReceiveChProvider::wait_msg(int64_t child_dfo_id, int64_t timeout_ts)
}
}
if (OB_SUCC(ret)) {
while (!msg_set_[child_dfo_id]) {
while (!is_msg_set(child_dfo_id)) {
msg_ready_cond_.lock();
if (!msg_set_[child_dfo_id]) {
msg_ready_cond_.wait_us(1 * 1000); /* 1 ms */
@ -392,7 +392,7 @@ int ObPxReceiveChProvider::wait_msg(int64_t child_dfo_id, int64_t timeout_ts)
ObInterruptCode code = GET_INTERRUPT_CODE();
ret = code.code_;
LOG_WARN("receive channel provider wait msg loop is interrupted",
K(child_dfo_id), K(wait_count), K(code), K(ret));
K(child_dfo_id), K(wait_count), K(code), K(msg_set_[child_dfo_id]), K(ret));
break;
} else {
ret = OB_EAGAIN;

View File

@ -92,6 +92,11 @@ private:
/* functions */
int wait_msg(int64_t child_dfo_id, int64_t timeout_ts);
int reserve_msg_set_array_size(int64_t size);
bool is_msg_set(int64_t child_dfo_id)
{
ObLockGuard<ObSpinLock> lock_guard(lock_);
return msg_set_[child_dfo_id];
}
private:
static const int64_t MSG_SET_DEFAULT_SIZE = 16;
private: