diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index 0b63725155..3de58dea2c 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -300,6 +300,7 @@ ObIORequest::ObIORequest() has_estimated_(false), io_info_(), deadline_ts_(0), + sender_index_(0), control_block_(nullptr), raw_buf_(nullptr), io_buf_(nullptr), @@ -375,6 +376,7 @@ void ObIORequest::destroy() is_canceled_ = false; has_estimated_ = false; deadline_ts_ = 0; + sender_index_ = 0; if (nullptr != control_block_ && nullptr != io_info_.fd_.device_handle_) { io_info_.fd_.device_handle_->free_iocb(control_block_); control_block_ = nullptr; @@ -1277,7 +1279,7 @@ void ObAtomIOClock::reset() ObMClockQueue::ObMClockQueue() : is_inited_(false), r_heap_(r_cmp_), - cl_heap_(cl_cmp_), + gl_heap_(gl_cmp_), tl_heap_(tl_cmp_), ready_heap_(p_cmp_) { @@ -1309,6 +1311,30 @@ void ObMClockQueue::destroy() is_inited_ = false; } +int ObMClockQueue::get_time_info(int64_t &reservation_ts, + int64_t &group_limitation_ts, + int64_t &tenant_limitation_ts, + int64_t &proportion_ts) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_inited_ || r_heap_.empty())) { + ret = OB_NOT_INIT; + LOG_WARN("not init yet", K(ret), K(is_inited_)); + } else { + ObPhyQueue *r_phy_queue = r_heap_.top(); + if (OB_ISNULL(r_phy_queue)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("phy_queue is null", K(ret)); + } else { + reservation_ts = r_phy_queue->reservation_ts_; + group_limitation_ts = gl_heap_.empty() ? INT64_MAX : gl_heap_.top()->group_limitation_ts_; + tenant_limitation_ts = tl_heap_.empty() ? INT64_MAX : tl_heap_.top()->tenant_limitation_ts_; + proportion_ts = ready_heap_.empty() ? INT64_MAX : ready_heap_.top()->proportion_ts_; + } + } + return ret; +} + int ObMClockQueue::push_phyqueue(ObPhyQueue *phy_queue) { int ret = OB_SUCCESS; @@ -1321,7 +1347,7 @@ int ObMClockQueue::push_phyqueue(ObPhyQueue *phy_queue) LOG_WARN("phy_queue is null", K(ret), KP(phy_queue)); } else if (OB_FAIL(r_heap_.push(phy_queue))) { LOG_WARN("push r heap failed", K(ret)); - } else if (OB_FAIL(cl_heap_.push(phy_queue))) { + } else if (OB_FAIL(gl_heap_.push(phy_queue))) { LOG_WARN("push cl heap failed", K(ret)); int tmp_ret = r_heap_.remove(phy_queue); if (OB_SUCCESS != tmp_ret) { @@ -1401,7 +1427,7 @@ int ObMClockQueue::remove_from_heap(ObPhyQueue *phy_queue) } else if (OB_FAIL(r_heap_.remove(phy_queue))) { LOG_WARN("remove phy queue from r heap failed", K(ret)); } else if (!phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) { - if (OB_FAIL(cl_heap_.remove(phy_queue))) { + if (OB_FAIL(gl_heap_.remove(phy_queue))) { LOG_WARN("remove phy queue from cl heap failed", K(ret)); } } else if (phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) { @@ -1424,24 +1450,24 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& int64_t iter_count = 0; ObPhyQueue *tmp_phy_queue = nullptr; req = nullptr; - while (OB_SUCC(ret) && !cl_heap_.empty() && !cl_heap_.top()->req_list_.is_empty()) { - tmp_phy_queue = cl_heap_.top(); + while (OB_SUCC(ret) && !gl_heap_.empty() && !gl_heap_.top()->req_list_.is_empty()) { + tmp_phy_queue = gl_heap_.top(); deadline_ts = 0 == iter_count ? tmp_phy_queue->tenant_limitation_ts_ : deadline_ts; ++iter_count; if (tmp_phy_queue->group_limitation_ts_ > current_ts) { break; - } else if (OB_FAIL(cl_heap_.pop())) { + } else if (OB_FAIL(gl_heap_.pop())) { LOG_WARN("remove PhyQueue from c_limitation queue failed", K(ret)); } else { tmp_phy_queue->is_group_ready_ = true; if (tmp_phy_queue->tenant_limitation_ts_ <= current_ts) { tmp_phy_queue->is_tenant_ready_ = true; if (OB_FAIL(ready_heap_.push(tmp_phy_queue))) { - LOG_WARN("push phy_queue from cl_heap to ready_heap failed", K(ret)); + LOG_WARN("push phy_queue from gl_heap to ready_heap failed", K(ret)); } } else { if (OB_FAIL(tl_heap_.push(tmp_phy_queue))) { - LOG_WARN("push phy_queue from cl_heap to tl_heap failed", K(ret)); + LOG_WARN("push phy_queue from gl_heap to tl_heap failed", K(ret)); } } } diff --git a/src/share/io/ob_io_define.h b/src/share/io/ob_io_define.h index a91bc94282..14e64793df 100644 --- a/src/share/io/ob_io_define.h +++ b/src/share/io/ob_io_define.h @@ -220,7 +220,7 @@ public: void inc_out_ref(); void dec_out_ref(); VIRTUAL_TO_STRING_KV(K(is_inited_), K(is_finished_), K(is_canceled_), K(has_estimated_), K(io_info_), K(deadline_ts_), - KP(control_block_), KP(raw_buf_), KP(io_buf_), K(io_offset_), K(io_size_), K(complete_size_), + K(sender_index_), KP(control_block_), KP(raw_buf_), KP(io_buf_), K(io_offset_), K(io_size_), K(complete_size_), K(time_log_), KP(channel_), K(ref_cnt_), K(out_ref_cnt_), K(trace_id_), K(ret_code_), K(retry_count_), K(callback_buf_size_), KP(copied_callback_), K(tenant_io_mgr_)); private: @@ -232,6 +232,7 @@ public: bool has_estimated_; ObIOInfo io_info_; int64_t deadline_ts_; + int64_t sender_index_; ObIOCB *control_block_; void *raw_buf_; char *io_buf_; @@ -392,6 +393,10 @@ public: virtual ~ObMClockQueue(); virtual int init() override; virtual void destroy() override; + int get_time_info(int64_t &reservation_ts, + int64_t &group_limitation_ts, + int64_t &tenant_limitation_ts, + int64_t &proportion_ts); int push_phyqueue(ObPhyQueue *phy_queue); int pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts); TO_STRING_KV(K(is_inited_)); @@ -410,11 +415,11 @@ private: private: bool is_inited_; HeapCompare r_cmp_; - HeapCompare cl_cmp_; + HeapCompare gl_cmp_; HeapCompare tl_cmp_; HeapCompare p_cmp_; ObRemovableHeap, &ObPhyQueue::reservation_pos_> r_heap_; - ObRemovableHeap, &ObPhyQueue::group_limitation_pos_> cl_heap_; + ObRemovableHeap, &ObPhyQueue::group_limitation_pos_> gl_heap_; ObRemovableHeap, &ObPhyQueue::tenant_limitation_pos_> tl_heap_; ObRemovableHeap, &ObPhyQueue::proportion_pos_> ready_heap_; }; diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index c640b101ae..195c9d7a33 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -688,6 +688,7 @@ void ObIOTuner::run1() // print interval must <= 1s, for ensuring real_iops >= 1 in gv$ob_io_quota. if (REACH_TIME_INTERVAL(1000L * 1000L * 1L)) { print_io_status(); + print_sender_status(); } ob_usleep(100 * 1000); // 100ms } @@ -695,6 +696,29 @@ void ObIOTuner::run1() } } +void ObIOTuner::print_sender_status() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < io_scheduler_.senders_.count(); ++i) { + ObIOSender *sender = io_scheduler_.senders_.at(i); + if (OB_ISNULL(sender)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("io sender is null", K(ret), K(i)); + } else { + int64_t reservation_ts = 0; + int64_t group_limitation_ts = 0; + int64_t tenant_limitation_ts = 0; + int64_t proportion_ts = 0; + + ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts); + if (OB_NOT_INIT != ret) { + LOG_INFO("[IO SENDER STATUS]", "send_index", sender->sender_index_, "req_count", sender->get_queue_count(), + K(reservation_ts), K(group_limitation_ts), K(tenant_limitation_ts), K(proportion_ts)); + } + } + } +} + void ObIOTuner::print_io_status() { int ret = OB_SUCCESS; @@ -702,34 +726,18 @@ void ObIOTuner::print_io_status() if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) { LOG_WARN("get tenant id failed", K(ret)); } else if (tenant_ids.count() > 0) { - ObArray queue_count_array; - if (OB_FAIL(queue_count_array.reserve(io_scheduler_.senders_.count()))) { - LOG_WARN("reserve queue count array failed", K(ret), K(io_scheduler_.senders_.count())); - } - for (int64_t i = 0; OB_SUCC(ret) && i < io_scheduler_.senders_.count(); ++i) { - ObIOSender *sender = io_scheduler_.senders_.at(i); - if (OB_ISNULL(sender)) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("io sender is null", K(ret), K(i)); - } else if (OB_FAIL(queue_count_array.push_back(sender->get_queue_count()))) { - LOG_WARN("push back queue count failed", K(ret)); - } - } - if (OB_SUCC(ret)) { - LOG_INFO("[IO STATUS]", K(tenant_ids), "send_thread_count", io_scheduler_.senders_.count(), "send_queues", queue_count_array); - } - } - for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { - const uint64_t cur_tenant_id = tenant_ids.at(i); - ObRefHolder tenant_holder; - if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) { - if (OB_HASH_NOT_EXIST != ret) { - LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id)); + for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { + const uint64_t cur_tenant_id = tenant_ids.at(i); + ObRefHolder tenant_holder; + if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) { + if (OB_HASH_NOT_EXIST != ret) { + LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id)); + } else { + ret = OB_SUCCESS; + } } else { - ret = OB_SUCCESS; + tenant_holder.get_ptr()->print_io_status(); } - } else { - tenant_holder.get_ptr()->print_io_status(); } } } @@ -815,6 +823,7 @@ ObSenderInfo::~ObSenderInfo() /****************** IOScheduleQueue **********************/ ObIOSender::ObIOSender(ObIAllocator &allocator) : sender_req_count_(0), + sender_index_(0), tg_id_(-1), is_inited_(false), stop_submit_(false), @@ -830,7 +839,7 @@ ObIOSender::~ObIOSender() destroy(); } -int ObIOSender::init() +int ObIOSender::init(const int64_t sender_index) { int ret = OB_SUCCESS; if (OB_UNLIKELY(is_inited_)) { @@ -849,6 +858,7 @@ int ObIOSender::init() } else { is_inited_ = true; sender_req_count_ = 0; + sender_index_ = sender_index; LOG_INFO("io sender init succ", KCSTRING(lbt())); } @@ -910,6 +920,7 @@ void ObIOSender::destroy() is_inited_ = false; stop_submit_ = false; sender_req_count_ = 0; + sender_index_ = 0; LOG_INFO("io sender destroyed", KCSTRING(lbt())); } @@ -1245,6 +1256,26 @@ int64_t ObIOSender::get_queue_count() const return OB_ISNULL(io_queue_) ? 0 : sender_req_count_; } +int ObIOSender::get_sender_info(int64_t &reservation_ts, + int64_t &group_limitation_ts, + int64_t &tenant_limitation_ts, + int64_t &proportion_ts) +{ + int ret = OB_SUCCESS; + if (!is_inited_) { + ret = OB_NOT_INIT; + LOG_WARN("Not init", K(ret), K(is_inited_)); + } else { + ObThreadCondGuard cond_guard(queue_cond_); + if (OB_FAIL(cond_guard.get_ret())) { + LOG_ERROR("guard queue condition failed", K(ret)); + } else { + ret = io_queue_->get_time_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts); + } + } + return ret; +} + int ObIOSender::get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info) { int ret = OB_SUCCESS; @@ -1403,11 +1434,12 @@ int ObIOScheduler::init(const int64_t queue_count, const int64_t schedule_media_ for (int64_t i = 0; OB_SUCC(ret) && i < queue_count; ++i) { void *buf = nullptr; ObIOSender *tmp_sender = nullptr; + int64_t sender_index = i + 1; if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObIOSender)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); } else if (FALSE_IT(tmp_sender = new (buf) ObIOSender(allocator_))) { - } else if (OB_FAIL(tmp_sender->init())) { + } else if (OB_FAIL(tmp_sender->init(sender_index))) { LOG_WARN("init io sender failed", K(ret), K(i), K(*tmp_sender)); } else if (OB_FAIL(senders_.push_back(tmp_sender))) { LOG_WARN("push back io sender failed", K(ret), K(i), K(*tmp_sender)); @@ -2902,6 +2934,7 @@ void ObIOFaultDetector::set_device_warning() { last_device_warning_ts_ = ObTimeUtility::fast_current_time(); is_device_warning_ = true; + LOG_WARN_RET(OB_IO_ERROR, "disk maybe too slow"); } // set disk error and record error_ts diff --git a/src/share/io/ob_io_struct.h b/src/share/io/ob_io_struct.h index 402e3fd6cd..aa14502550 100644 --- a/src/share/io/ob_io_struct.h +++ b/src/share/io/ob_io_struct.h @@ -184,6 +184,7 @@ public: virtual void run1() override; private: + void print_sender_status(); void print_io_status(); private: bool is_inited_; @@ -224,7 +225,7 @@ class ObIOSender : public lib::TGRunnable public: ObIOSender(ObIAllocator &allocator); virtual ~ObIOSender(); - int init(); + int init(const int64_t sender_index); void stop(); void wait(); void destroy(); @@ -243,13 +244,18 @@ public: int stop_phy_queue(const uint64_t tenant_id, const uint64_t index); int notify(); int64_t get_queue_count() const; + int get_sender_info(int64_t &reservation_ts, + int64_t &group_limitation_ts, + int64_t &tenant_limitation_ts, + int64_t &proportion_ts); int get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info); - TO_STRING_KV(K(is_inited_), K(stop_submit_), KPC(io_queue_), K(tg_id_)); + TO_STRING_KV(K(is_inited_), K(stop_submit_), KPC(io_queue_), K(tg_id_), K(sender_index_)); //private: void pop_and_submit(); int64_t calc_wait_timeout(const int64_t queue_deadline); int submit(ObIORequest &req); int64_t sender_req_count_; + int64_t sender_index_; int tg_id_; // thread group id bool is_inited_; bool stop_submit_;