From b7e97b66319887dd9e2dd5f09f9eb60f57e2af25 Mon Sep 17 00:00:00 2001 From: obdev Date: Thu, 21 Nov 2024 20:18:41 +0000 Subject: [PATCH] Fix the concurrency error that occurs when creating a group --- src/share/io/ob_io_define.cpp | 10 +++++++--- src/share/io/ob_io_struct.cpp | 6 +++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index e0a1fcc0f..a0526b1f2 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -2474,9 +2474,13 @@ int ObMClockQueue::remove_from_heap(ObPhyQueue *phy_queue) } else if (-1 == phy_queue->reservation_pos_ || (-1 == phy_queue->limitation_pos_ && -1 == phy_queue->proportion_pos_)) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid status", K(phy_queue->reservation_pos_), - K(phy_queue->limitation_pos_), - K(phy_queue->proportion_pos_)); + LOG_WARN("phy_queue is not in heaps of reservation, proportion and limitation", + K(phy_queue), + K(*this), + K(ret), + K(phy_queue->reservation_pos_), + K(phy_queue->limitation_pos_), + K(phy_queue->proportion_pos_)); } else if (OB_FAIL(r_heap_.remove(phy_queue))) { LOG_WARN("remove phy queue from r heap failed", K(ret)); } else if (FALSE_IT(phy_queue->reservation_pos_ = -1)) { diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index b5a5ec885..4364578ea 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -1341,12 +1341,12 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group } else if (FALSE_IT(tmp_phyqueue = new (buf) ObPhyQueue())) { } else if (OB_FAIL(tmp_phyqueue->init(i))) { LOG_WARN("init io phy_queue failed", K(ret), K(i), K(*tmp_phyqueue)); - } else if (OB_FAIL(io_group_queues->group_phy_queues_.push_back(tmp_phyqueue))) { - LOG_WARN("push back io sender failed", K(ret), K(i), K(*tmp_phyqueue)); } else if (OB_FAIL(enqueue_phy_queue(*tmp_phyqueue))) { LOG_WARN("new queue into heap failed", K(ret)); + } else if (OB_FAIL(io_group_queues->group_phy_queues_.push_back(tmp_phyqueue))) { + LOG_WARN("push back io sender failed", K(ret), K(i), K(*tmp_phyqueue)); } else { - LOG_INFO("add phy queue success", K(tenant_id), K(cur_num), K(group_num)); + LOG_INFO("add phy queue success", K(tenant_id), K(i), K(tmp_phyqueue), K(cur_num), K(group_num)); } if (OB_FAIL(ret) && nullptr != tmp_phyqueue) { tmp_phyqueue->~ObPhyQueue();