diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index 62285c1db3..e15415b8c3 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -709,7 +709,6 @@ void ObIORequest::dec_out_ref() ObPhyQueue::ObPhyQueue() : is_inited_(false), - phy_queue_(), reservation_ts_(INT_MAX64), category_limitation_ts_(INT_MAX64), tenant_limitation_ts_(INT_MAX64), @@ -720,7 +719,8 @@ ObPhyQueue::ObPhyQueue() reservation_pos_(-1), category_limitation_pos_(-1), tenant_limitation_pos_(-1), - proportion_pos_(-1) + proportion_pos_(-1), + req_list_() { } @@ -736,8 +736,6 @@ int ObPhyQueue::init(const int index) if (OB_UNLIKELY(is_inited_)) { ret = OB_INIT_TWICE; LOG_WARN("phy queue init twice", K(ret), K(is_inited_)); - } else if (OB_FAIL(phy_queue_.init(SINGLE_QUEUE_DEPTH))) { - LOG_WARN("phy queue init failed", K(ret)); } else if (index < 0 || index > static_cast(ObIOCategory::MAX_CATEGORY)){ ret = OB_INVALID_ARGUMENT; LOG_WARN("index out of boundary", K(ret), K(index)); @@ -754,7 +752,6 @@ int ObPhyQueue::init(const int index) void ObPhyQueue::destroy() { is_inited_ = false; - phy_queue_.destroy(); reservation_ts_ = INT_MAX64; category_limitation_ts_ = INT_MAX64; tenant_limitation_ts_ = INT_MAX64; @@ -1293,32 +1290,29 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts) if (OB_ISNULL(tmp_phy_queue)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("phy_queue is null", K(ret), KP(tmp_phy_queue)); - } else if (tmp_phy_queue->phy_queue_.get_total() <= 0) { + } else if (tmp_phy_queue->req_list_.is_empty()) { ret = OB_ENTRY_NOT_EXIST; } else if (tmp_phy_queue->reservation_ts_ <= current_ts) { //R schedule if(OB_FAIL(remove_from_heap(tmp_phy_queue))) { LOG_WARN("remove phy queue from heap failed(R schedule)", K(ret)); } else { - if (OB_FAIL(tmp_phy_queue->phy_queue_.pop(req))) { - LOG_WARN("pop req from r_heap failed", K(ret)); - } else if (OB_ISNULL(req)) { + req = tmp_phy_queue->req_list_.remove_first(); + if (OB_ISNULL(req)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("request is null", K(ret), KP(req)); } else { req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); - if (tmp_phy_queue->phy_queue_.get_total() <= 0) { + if (tmp_phy_queue->req_list_.is_empty()) { tmp_phy_queue->reset_time_info(); } else if (OB_NOT_NULL(req->tenant_io_mgr_.get_ptr())) { ObTenantIOClock *io_clock = static_cast(req->tenant_io_mgr_.get_ptr()->get_io_clock()); - ObIORequest *next_req = nullptr; - if (OB_FAIL(tmp_phy_queue->phy_queue_.head_unsafe(next_req))) { - LOG_WARN("get next req failed", K(ret), KP(next_req)); - } else if (OB_ISNULL(next_req)) { + ObIORequest *next_req = tmp_phy_queue->req_list_.get_first(); + if (OB_ISNULL(next_req)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get null next_req", KP(next_req)); } else if (OB_FAIL(io_clock->calc_phyqueue_clock(tmp_phy_queue, *next_req))) { - LOG_WARN("calc phyqueue clock failed", K(ret)); + LOG_WARN("calc phyqueue clock failed", K(ret), KPC(next_req)); } } } @@ -1372,7 +1366,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& int64_t iter_count = 0; ObPhyQueue *tmp_phy_queue = nullptr; req = nullptr; - while (OB_SUCC(ret) && !cl_heap_.empty() && cl_heap_.top()->phy_queue_.get_total() > 0) { + while (OB_SUCC(ret) && !cl_heap_.empty() && !cl_heap_.top()->req_list_.is_empty()) { tmp_phy_queue = cl_heap_.top(); deadline_ts = 0 == iter_count ? tmp_phy_queue->tenant_limitation_ts_ : deadline_ts; ++iter_count; @@ -1395,7 +1389,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& } } iter_count = 0; - while (OB_SUCC(ret) && !tl_heap_.empty() && tl_heap_.top()->phy_queue_.get_total() > 0) { + while (OB_SUCC(ret) && !tl_heap_.empty() && !tl_heap_.top()->req_list_.is_empty()) { tmp_phy_queue = tl_heap_.top(); if (0 == iter_count) { if (0 == deadline_ts) { @@ -1421,26 +1415,23 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& if (OB_ISNULL(tmp_phy_queue)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("phy_queue is null", K(ret)); - } else if (tmp_phy_queue->phy_queue_.get_total() > 0) { + } else if (!tmp_phy_queue->req_list_.is_empty()) { if (OB_FAIL(remove_from_heap(tmp_phy_queue))) { LOG_WARN("remove phy queue from heap failed(P schedule)", K(ret)); } else { - if (OB_FAIL(tmp_phy_queue->phy_queue_.pop(req))) { - LOG_WARN("pop req from ready_heap failed"); - } else if (OB_ISNULL(req)) { + req = tmp_phy_queue->req_list_.remove_first(); + if (OB_ISNULL(req)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("req is null", K(ret), KP(req)); } else { req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); LOG_DEBUG("req pop from phy queue succcess(P schedule)", KP(req), K(iter_count), "time_cost", ObTimeUtility::fast_current_time() - current_ts, K(ready_heap_.count()), K(current_ts)); - if (tmp_phy_queue->phy_queue_.get_total() <= 0) { + if (tmp_phy_queue->req_list_.is_empty()) { tmp_phy_queue->reset_time_info(); } else if (OB_NOT_NULL(req->tenant_io_mgr_.get_ptr())) { ObTenantIOClock *io_clock = static_cast(req->tenant_io_mgr_.get_ptr()->get_io_clock()); - ObIORequest *next_req = nullptr; - if (OB_FAIL(tmp_phy_queue->phy_queue_.head_unsafe(next_req))) { - LOG_WARN("get next req failed", K(ret), K(next_req)); - } else if (OB_ISNULL(next_req)) { + ObIORequest *next_req = tmp_phy_queue->req_list_.get_first(); + if (OB_ISNULL(next_req)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("get null next_req", KP(next_req)); } else { @@ -1462,7 +1453,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& } } else { ret = OB_EAGAIN; - if (!r_heap_.empty() && r_heap_.top()->phy_queue_.get_total() > 0) { + if (!r_heap_.empty() && !r_heap_.top()->req_list_.is_empty()) { ObPhyQueue *next_tmp_phy_queue = r_heap_.top(); if (0 == deadline_ts) { deadline_ts = next_tmp_phy_queue->reservation_ts_; diff --git a/src/share/io/ob_io_define.h b/src/share/io/ob_io_define.h index d5ea045732..e7476bc78d 100644 --- a/src/share/io/ob_io_define.h +++ b/src/share/io/ob_io_define.h @@ -18,7 +18,7 @@ #include "lib/lock/ob_thread_cond.h" #include "lib/container/ob_rbtree.h" #include "common/storage/ob_io_device.h" -#include "lib/queue/ob_fixed_queue.h" +#include "lib/list/ob_list.h" #include "lib/container/ob_heap.h" namespace oceanbase @@ -207,7 +207,7 @@ class ObIOChannel; class ObIOSender; class ObIOUsage; -class ObIORequest +class ObIORequest : public common::ObDLinkBase { public: ObIORequest(); @@ -272,10 +272,9 @@ public: void reset_time_info(); void reset_queue_info(); public: + typedef common::ObDList IOReqList; TO_STRING_KV(K_(reservation_ts), K_(category_limitation_ts), K_(tenant_limitation_ts)); bool is_inited_; - ObFixedQueue phy_queue_; - static const int32_t SINGLE_QUEUE_DEPTH = 10000; int64_t reservation_ts_; int64_t category_limitation_ts_; int64_t tenant_limitation_ts_; @@ -287,6 +286,7 @@ public: int64_t category_limitation_pos_; int64_t tenant_limitation_pos_; int64_t proportion_pos_; + IOReqList req_list_; }; class ObIOHandle final diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index 853d832ee5..05aa809b45 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -634,12 +634,40 @@ void ObIOTuner::print_io_status() } } -/****************** ObPhyQueue **********************/ -ObTenantPhyQueues::ObTenantPhyQueues() +/****************** ObIOCategoryQueues **********************/ +ObIOCategoryQueues::ObIOCategoryQueues() + : is_inited_(false) { } +ObIOCategoryQueues::~ObIOCategoryQueues() +{ + destroy(); +} + +int ObIOCategoryQueues::init() +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + LOG_WARN("phy queue init twice", K(ret), K(is_inited_)); + } else { + for (int64_t i = 0; OB_SUCC(ret) && i < static_cast(ObIOCategory::MAX_CATEGORY) + 1; ++i) { + if (OB_FAIL(phy_queues_[i].init(i))){ + LOG_WARN("phy queue init failed", K(ret)); + } + } + } + if (OB_SUCC(ret)) { + is_inited_ = true; + } + return ret; +} +void ObIOCategoryQueues::destroy() +{ + is_inited_ = false; +} /****************** IOScheduleQueue **********************/ ObIOSender::ObIOSender(ObIAllocator &allocator) : is_inited_(false), @@ -693,11 +721,9 @@ struct DestroyPhyqueueMapFn { public: DestroyPhyqueueMapFn(ObIAllocator &allocator) : allocator_(allocator) {} - int operator () (hash::HashMapPair &entry) { + int operator () (hash::HashMapPair &entry) { if (nullptr != entry.second) { - for (int64_t i = 0; i < static_cast(ObIOCategory::MAX_CATEGORY) + 1; ++i) { - entry.second->phy_queues_[i].destroy(); - } + entry.second->~ObIOCategoryQueues(); allocator_.free(entry.second); } return OB_SUCCESS; @@ -797,7 +823,7 @@ int ObIOSender::alloc_mclock_queue(ObIAllocator &allocator, ObMClockQueue *&io_q int ObIOSender::enqueue_request(ObIORequest &req) { int ret = OB_SUCCESS; - ObTenantPhyQueues *tenant_phy_queues = nullptr; + ObIOCategoryQueues *io_category_queues = nullptr; ObIORequest *tmp_req = &req; if (!is_inited_) { ret = OB_NOT_INIT; @@ -807,18 +833,19 @@ int ObIOSender::enqueue_request(ObIORequest &req) if (OB_FAIL(cond_guard.get_ret())) { LOG_ERROR("guard queue condition failed", K(ret)); } else { - if (OB_FAIL(tenant_map_.get_refactored(tmp_req->io_info_.tenant_id_, tenant_phy_queues))) { + if (OB_FAIL(tenant_map_.get_refactored(tmp_req->io_info_.tenant_id_, io_category_queues))) { LOG_WARN("get_refactored tenant_map failed", K(ret), K(req)); } else { const int index = static_cast(tmp_req->get_category()); - ObPhyQueue *tmp_phy_queue = &(tenant_phy_queues->phy_queues_[index]); - if (tmp_phy_queue->phy_queue_.get_total() <= 0) { + ObPhyQueue *tmp_phy_queue = &(io_category_queues->phy_queues_[index]); + if (tmp_phy_queue->req_list_.is_empty()) { //new request if (OB_FAIL(io_queue_->remove_from_heap(tmp_phy_queue))) { LOG_WARN("remove phy queue from heap failed", K(ret), K(index)); } else { - req.inc_ref("phyqueue_inc"); //ref for phy_queue - if (OB_FAIL(tmp_phy_queue->phy_queue_.push(tmp_req))) { + req.inc_ref("phyqueue_inc"); //ref for phy_queue + if (OB_UNLIKELY(!tmp_phy_queue->req_list_.add_last(tmp_req))) { + ret = OB_ERR_UNEXPECTED; req.dec_ref("phyqueue_dec"); //ref for phy_queue tmp_phy_queue->reset_time_info(); LOG_WARN("push new req into phy queue failed", K(ret)); @@ -846,7 +873,8 @@ int ObIOSender::enqueue_request(ObIORequest &req) } else { //not new req, into phy_queue and line up req.inc_ref("phyqueue_inc"); //ref for phy_queue - if (OB_FAIL(tmp_phy_queue->phy_queue_.push(tmp_req))) { + if (OB_UNLIKELY(!tmp_phy_queue->req_list_.add_last(tmp_req))) { + ret = OB_ERR_UNEXPECTED; LOG_WARN("req line up failed", K(req)); req.dec_ref("phyqueue_dec"); //ref for phy_queue } else { @@ -865,7 +893,7 @@ int ObIOSender::enqueue_request(ObIORequest &req) return ret; } -int ObIOSender::enqueue_phy_queue(ObPhyQueue *phyqueue) +int ObIOSender::enqueue_phy_queue(ObPhyQueue &phyqueue) { int ret = OB_SUCCESS; if (!is_inited_) { @@ -876,7 +904,7 @@ int ObIOSender::enqueue_phy_queue(ObPhyQueue *phyqueue) if (OB_FAIL(cond_guard.get_ret())) { LOG_ERROR("guard queue condition failed", K(ret)); } else { - if (OB_FAIL(io_queue_->push_phyqueue(phyqueue))) { + if (OB_FAIL(io_queue_->push_phyqueue(&phyqueue))) { LOG_WARN("push phyqueue into queue failed", K(ret)); } else { if (OB_FAIL(queue_cond_.signal())) { @@ -930,20 +958,19 @@ int ObIOSender::remove_phy_queue(const uint64_t tenant_id) if (OB_FAIL(cond_guard.get_ret())) { LOG_ERROR("guard queue condition failed", K(ret)); } else { - ObTenantPhyQueues *tenant_phy_queues = nullptr; - if (OB_FAIL(tenant_map_.erase_refactored(tenant_id, &tenant_phy_queues))) { + ObIOCategoryQueues *io_category_queues = nullptr; + if (OB_FAIL(tenant_map_.erase_refactored(tenant_id, &io_category_queues))) { LOG_WARN("erase phy_queues failed", K(ret), K(tenant_id)); - } else if (nullptr != tenant_phy_queues) { + } else if (nullptr != io_category_queues) { for (int64_t j = 0; OB_SUCC(ret) && j < static_cast(ObIOCategory::MAX_CATEGORY) + 1; ++j) { - ObPhyQueue *tmp_phy_queue = &(tenant_phy_queues->phy_queues_[j]); + ObPhyQueue *tmp_phy_queue = &(io_category_queues->phy_queues_[j]); if (OB_FAIL(io_queue_->remove_from_heap(tmp_phy_queue))) { LOG_WARN("remove phy queue from heap failed", K(ret)); - } else { - tmp_phy_queue->~ObPhyQueue(); } } if (OB_SUCC(ret)) { - allocator_.free(tenant_phy_queues); + io_category_queues->~ObIOCategoryQueues(); + allocator_.free(io_category_queues); } } } @@ -1202,29 +1229,29 @@ int ObIOScheduler::add_tenant_map(uint64_t tenant_id) int ret = OB_SUCCESS; for (int64_t i = 0; OB_SUCC(ret) && i < senders_.count(); ++i) { ObIOSender *cur_sender = senders_.at(i); - ObTenantPhyQueues *tenant_phy_queues = nullptr; + ObIOCategoryQueues *io_category_queues = nullptr; void *buf_queues = nullptr; - if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObTenantPhyQueues)))) { + if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObIOCategoryQueues)))) { ret = OB_ALLOCATE_MEMORY_FAILED; - LOG_WARN("allocate tenant phyqueues memory failed", K(ret)); + LOG_WARN("allocate phyqueues memory failed", K(ret)); } else { - tenant_phy_queues = new (buf_queues) ObTenantPhyQueues; - for(int64_t j = 0; OB_SUCC(ret) && j < static_cast(ObIOCategory::MAX_CATEGORY) + 1; j++) { - if (OB_FAIL(tenant_phy_queues->phy_queues_[j].init(j))){ - LOG_WARN("phy queue init failed", K(ret)); - } else if (OB_FAIL(cur_sender->enqueue_phy_queue(&(tenant_phy_queues->phy_queues_[j])))) { - LOG_WARN("new phy_queue into send_queue failed", K(ret)); + io_category_queues = new (buf_queues) ObIOCategoryQueues(); + if (OB_FAIL(io_category_queues->init())) { + LOG_WARN("init phyqueues failed", K(ret)); + } else { + for(int64_t j = 0; OB_SUCC(ret) && j < static_cast(ObIOCategory::MAX_CATEGORY) + 1; j++) { + if (OB_FAIL(cur_sender->enqueue_phy_queue(io_category_queues->phy_queues_[j]))) { + LOG_WARN("new phy_queue into send_queue failed", K(ret)); + } } } if (OB_SUCC(ret)) { - if (OB_FAIL(cur_sender->tenant_map_.set_refactored(tenant_id, tenant_phy_queues))) { + if (OB_FAIL(cur_sender->tenant_map_.set_refactored(tenant_id, io_category_queues))) { LOG_WARN("init tenant map failed", K(ret), K(i)); } } else { - for(int64_t j = 0; OB_SUCC(ret) && j < static_cast(ObIOCategory::MAX_CATEGORY) + 1; j++) { - tenant_phy_queues->phy_queues_[j].~ObPhyQueue(); - } - cur_sender->allocator_.free(tenant_phy_queues); + io_category_queues->~ObIOCategoryQueues(); + cur_sender->allocator_.free(io_category_queues); } } } diff --git a/src/share/io/ob_io_struct.h b/src/share/io/ob_io_struct.h index a1ee2e54de..a573e4f845 100644 --- a/src/share/io/ob_io_struct.h +++ b/src/share/io/ob_io_struct.h @@ -187,11 +187,14 @@ private: ObIOScheduler &io_scheduler_; }; -struct ObTenantPhyQueues final { +struct ObIOCategoryQueues final { public: - ObTenantPhyQueues(); - ~ObTenantPhyQueues(); -public: + ObIOCategoryQueues(); + ~ObIOCategoryQueues(); + int init(); + void destroy(); +public: + bool is_inited_; ObPhyQueue phy_queues_[static_cast(ObIOCategory::MAX_CATEGORY) + 1]; }; @@ -212,7 +215,7 @@ public: int alloc_mclock_queue(ObIAllocator &allocator, ObMClockQueue *&io_queue); int enqueue_request(ObIORequest &req); - int enqueue_phy_queue(ObPhyQueue *phyqueue); + int enqueue_phy_queue(ObPhyQueue &phyqueue); int dequeue_request(ObIORequest *&req); int remove_phy_queue(const uint64_t tenant_id); int notify(); @@ -229,7 +232,7 @@ public: int tg_id_; // thread group id ObMClockQueue *io_queue_; ObThreadCond queue_cond_; - hash::ObHashMap tenant_map_; + hash::ObHashMap tenant_map_; int64_t sender_req_count_; }; diff --git a/unittest/storage/test_io_manager.cpp b/unittest/storage/test_io_manager.cpp index 48883882b6..9f88be1ac1 100644 --- a/unittest/storage/test_io_manager.cpp +++ b/unittest/storage/test_io_manager.cpp @@ -560,7 +560,8 @@ TEST_F(TestIOStruct, IOCallbackManager) ASSERT_FAIL(callback_mgr.enqueue_callback(req)); char buf[32] = "test"; req.io_buf_ = buf; - req.copied_callback_ = new (req.callback_buf_) TestIOCallback(); + char callback_buf_[ObIOCallback::CALLBACK_BUF_SIZE] __attribute__ ((aligned (16))); + req.copied_callback_ = new (callback_buf_) TestIOCallback(); // ObIOManager::get_instance().io_config_ = ObIOConfig::default_config(); ASSERT_SUCC(callback_mgr.enqueue_callback(req));