[CP] fix: gts callback may be skipped when concurrent gts workers iterate the callback queue

This commit is contained in:
obdev
2024-02-08 01:20:30 +00:00
committed by ob-robot
parent 9821c7ab6e
commit dcde58534b
3 changed files with 15 additions and 4 deletions

View File

@ -45,6 +45,7 @@ public:
int get_gts(const MonotonicTs stc, int64_t &gts, MonotonicTs &receive_gts_ts, bool &need_send_rpc) const;
int get_srr_and_gts_safe(MonotonicTs &srr, int64_t &gts, MonotonicTs &receive_gts_ts) const;
int update_latest_srr(const MonotonicTs latest_srr);
bool no_rpc_on_road() const { return ATOMIC_LOAD(&latest_srr_.mts_) == ATOMIC_LOAD(&srr_.mts_); }
TO_STRING_KV(K_(srr), K_(gts), K_(latest_srr));
private:

View File

@ -668,7 +668,17 @@ int ObGtsSource::handle_gts_result(const uint64_t tenant_id, const int64_t queue
} else {
ObGTSTaskQueue *queue = &(queue_[queue_index]);
if (OB_FAIL(queue->foreach_task(srr, gts, receive_gts_ts))) {
TRANS_LOG(WARN, "iterate task failed", KR(ret), K(queue_index));
if (OB_EAGAIN == ret) {
ret = OB_SUCCESS;
if (gts_local_cache_.no_rpc_on_road()) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = refresh_gts_(false))) {
TRANS_LOG(WARN, "refresh gts failed", K(tmp_ret));
}
}
} else {
TRANS_LOG(WARN, "iterate task failed", KR(ret), K(queue_index));
}
}
}
return ret;

View File

@ -108,9 +108,9 @@ int ObGTSTaskQueue::foreach_task(const MonotonicTs srr,
TRANS_LOG(WARN, "unknown gts task type", KR(ret), K_(task_type));
}
if (OB_EAGAIN == ret) {
// rewrite ret
ret = OB_SUCCESS;
if (OB_FAIL(queue_.push(task))) {
int tmp_ret = OB_SUCCESS;
if (OB_SUCCESS != (tmp_ret = queue_.push(task))) {
ret = tmp_ret;
TRANS_LOG(ERROR, "push gts task failed", KR(ret), KP(task));
} else {
TRANS_LOG(DEBUG, "push back gts task", KP(task));