From 106d246f511cce56493cbd153999fafa1fa074fc Mon Sep 17 00:00:00 2001 From: zhjc1124 Date: Mon, 17 Jun 2024 09:31:37 +0000 Subject: [PATCH] [master] mv ObIOTimeLog from ObIORequest to ObIOResult --- src/share/io/ob_io_define.cpp | 18 +++++++++------ src/share/io/ob_io_define.h | 9 ++++---- src/share/io/ob_io_struct.cpp | 43 ++++++++++++++++++++++++++--------- 3 files changed, 48 insertions(+), 22 deletions(-) diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index 735aaf4ae..8bf6a6b2d 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -430,8 +430,9 @@ ObIOResult::ObIOResult() user_data_buf_(nullptr), io_callback_(nullptr), flag_(), - ret_code_(), - cond_() + cond_(), + time_log_(), + ret_code_() { } @@ -509,6 +510,7 @@ void ObIOResult::reset() io_callback_ = nullptr; flag_.reset(); ret_code_.reset(); + time_log_.reset(); tenant_io_mgr_.reset(); //do not destroy thread_cond is_inited_ = false; @@ -532,6 +534,7 @@ void ObIOResult::destroy() io_callback_ = nullptr; flag_.reset(); ret_code_.reset(); + time_log_.reset(); cond_.destroy(); tenant_io_mgr_.reset(); is_inited_ = false; @@ -739,7 +742,6 @@ ObIORequest::ObIORequest() tenant_id_(OB_INVALID_TENANT_ID), tenant_io_mgr_(), fd_(), - time_log_(), trace_id_() { @@ -830,7 +832,6 @@ void ObIORequest::reset() //only for test, not dec resut_ref free_io_buffer(); ref_cnt_ = 0; trace_id_.reset(); - time_log_.reset(); io_result_ = nullptr; fd_.reset(); tenant_io_mgr_.reset(); @@ -849,7 +850,6 @@ void ObIORequest::destroy() free_io_buffer(); ref_cnt_ = 0; trace_id_.reset(); - time_log_.reset(); if (nullptr != io_result_) { io_result_->dec_ref("request"); io_result_ = nullptr; @@ -1966,7 +1966,9 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts) ret = OB_ERR_UNEXPECTED; LOG_WARN("request is null", K(ret), KP(req)); } else { - req->io_result_ == nullptr ? : req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); + if (OB_NOT_NULL(req->io_result_)) { + req->io_result_->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); + } if (tmp_phy_queue->req_list_.is_empty()) { tmp_phy_queue->reset_time_info(); tmp_phy_queue->last_empty_ts_ = ObTimeUtility::fast_current_time(); @@ -2093,7 +2095,9 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& ret = OB_ERR_UNEXPECTED; LOG_WARN("req is null", K(ret), KP(req)); } else { - req->io_result_ == nullptr ? : req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); + if (OB_NOT_NULL(req->io_result_)) { + req->io_result_->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time(); + } LOG_DEBUG("req pop from phy queue succcess(P schedule)", KP(req), K(iter_count), "time_cost", ObTimeUtility::fast_current_time() - current_ts, K(ready_heap_.count()), K(current_ts)); if (tmp_phy_queue->req_list_.is_empty()) { tmp_phy_queue->reset_time_info(); diff --git a/src/share/io/ob_io_define.h b/src/share/io/ob_io_define.h index 27f6dce64..aa7f04e23 100644 --- a/src/share/io/ob_io_define.h +++ b/src/share/io/ob_io_define.h @@ -320,7 +320,7 @@ public: TO_STRING_KV(K(is_inited_), K(is_finished_), K(is_canceled_), K(has_estimated_), K(complete_size_), K(offset_), K(size_), K(timeout_us_), K(result_ref_cnt_), K(out_ref_cnt_), K(flag_), K(ret_code_), K(tenant_io_mgr_), - KP(user_data_buf_), KP(buf_), KP(io_callback_), K(begin_ts_), K(end_ts_)); + KP(user_data_buf_), KP(buf_), KP(io_callback_), K(begin_ts_), K(end_ts_), K_(time_log)); DISALLOW_COPY_AND_ASSIGN(ObIOResult); private: @@ -348,8 +348,10 @@ private: char *user_data_buf_; //actual data buf without cb, allocated by thpe calling layer ObIOCallback *io_callback_; ObIOFlag flag_; - ObIORetCode ret_code_; ObThreadCond cond_; +public: + ObIOTimeLog time_log_; + ObIORetCode ret_code_; }; class ObIORequest : public common::ObDLinkBase @@ -387,7 +389,7 @@ public: void dec_ref(const char *msg = nullptr); TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(control_block_), K(ref_cnt_), KP(raw_buf_), K(fd_), - K(trace_id_), K(retry_count_), K(tenant_io_mgr_), K(time_log_), KPC(io_result_)); + K(trace_id_), K(retry_count_), K(tenant_io_mgr_), KPC(io_result_)); private: friend class ObIOResult; friend class ObIOSender; @@ -414,7 +416,6 @@ private: uint64_t tenant_id_; ObRefHolder tenant_io_mgr_; ObIOFd fd_; - ObIOTimeLog time_log_; ObCurTraceId::TraceId trace_id_; }; diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index d7dc21787..0e98af76f 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -532,8 +532,11 @@ void ObIOUsage::accumulate(ObIOResult &result, ObIORequest &request) int32_t io_offset = 0; int64_t io_size = 0; result.calc_io_offset_and_size(io_size, io_offset); - if (request.time_log_.return_ts_ > 0) { - const int64_t device_delay = get_io_interval(request.time_log_.return_ts_, request.time_log_.submit_ts_); + if (OB_NOT_NULL(request.io_result_) + && request.io_result_->time_log_.return_ts_ > 0 + && request.io_result_->ret_code_.io_ret_ == 0) { + const int64_t device_delay = get_io_interval(request.io_result_->time_log_.return_ts_, + request.io_result_->time_log_.submit_ts_); io_stats_.at(result.get_io_usage_index()).at(static_cast(result.get_mode())) .accumulate(1, io_size, device_delay); } @@ -678,12 +681,12 @@ void ObSysIOUsage::accumulate(ObIOResult &result, ObIORequest &request) { if (OB_UNLIKELY(!request.is_sys_module())) { // ignore - } else if (request.time_log_.return_ts_ > 0) { + } else if (request.io_result_->time_log_.return_ts_ > 0) { int32_t io_offset = 0; int64_t io_size = 0; result.calc_io_offset_and_size(io_size, io_offset); const uint64_t idx = result.get_sys_module_id() - SYS_RESOURCE_GROUP_START_ID; - const int64_t device_delay = get_io_interval(request.time_log_.return_ts_, request.time_log_.submit_ts_); + const int64_t device_delay = get_io_interval(request.io_result_->time_log_.return_ts_, request.io_result_->time_log_.submit_ts_); io_stats_.at(idx).at(static_cast(result.get_mode())) .accumulate(1, io_size, device_delay); } @@ -1200,7 +1203,9 @@ int ObIOSender::enqueue_request(ObIORequest &req) LOG_WARN("push new req into phy queue failed", K(ret)); } else { ATOMIC_INC(&sender_req_count_); - req.time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time(); + if (OB_NOT_NULL(req.io_result_)) { + req.io_result_->time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time(); + } //calc ts_ if (OB_NOT_NULL(req.tenant_io_mgr_.get_ptr())) { ObTenantIOClock *io_clock = static_cast(req.tenant_io_mgr_.get_ptr()->get_io_clock()); @@ -1231,7 +1236,9 @@ int ObIOSender::enqueue_request(ObIORequest &req) req.dec_ref("phyqueue_dec"); //ref for phy_queue } else { ATOMIC_INC(&sender_req_count_); - req.time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time(); + if (OB_NOT_NULL(req.io_result_)) { + req.io_result_->time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time(); + } } } if (OB_SUCC(ret)) { @@ -2081,7 +2088,9 @@ int ObAsyncIOChannel::submit(ObIORequest &req) int64_t io_size = 0; req.calc_io_offset_and_size(io_size, io_offset); ATOMIC_FAA(&device_channel_->used_io_depth_, get_io_depth(io_size)); - req.time_log_.submit_ts_ = ObTimeUtility::current_time(); + if (OB_NOT_NULL(req.io_result_)) { + req.io_result_->time_log_.submit_ts_ = ObTimeUtility::fast_current_time(); + } req.inc_ref("os_inc"); // ref for file system if (OB_FAIL(device_handle_->io_submit(io_context_, req.control_block_))) { ATOMIC_DEC(&submit_count_); @@ -2100,7 +2109,10 @@ void ObAsyncIOChannel::cancel(ObIORequest &req) if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); - } else if (0 != req.time_log_.submit_ts_ && 0 == req.time_log_.return_ts_) { + } else if (OB_ISNULL(req.io_result_)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("io result is null", KR(ret), K(req)); + } else if (0 != req.io_result_->time_log_.submit_ts_ && 0 == req.io_result_->time_log_.return_ts_) { // Note: here if ob_io_cancel failed (possibly due to kernel not supporting io_cancel), // neither we or the get_events thread would call control.callback_->process(), // as we previously set need_callback to false. @@ -2145,7 +2157,9 @@ void ObAsyncIOChannel::get_events() } else { RequestHolder holder(req); req->dec_ref("os_dec"); // ref for file system - req->time_log_.return_ts_ = io_return_time; + if (OB_NOT_NULL(req->io_result_)) { + req->io_result_->time_log_.return_ts_ = ObTimeUtility::fast_current_time(); + } int64_t io_offset = 0; int64_t io_size = 0; req->calc_io_offset_and_size(io_size, io_offset); @@ -2451,13 +2465,15 @@ int ObSyncIOChannel::do_sync_io(ObIORequest &req) { int ret = OB_SUCCESS; int64_t io_size = 0; - int64_t io_offset = static_cast(req.io_result_->offset_); + int64_t io_offset = 0; if (OB_ISNULL(device_handle_)) { ret = OB_ERR_SYS; LOG_WARN("device handle is null", K(ret)); } else if (OB_ISNULL(req.io_result_)) { ret = OB_INVALID_ARGUMENT; LOG_WARN("io result is null", K(ret)); + } else if (FALSE_IT(io_offset = static_cast(req.io_result_->offset_))) { + } else if (FALSE_IT(req.io_result_->time_log_.submit_ts_ = ObTimeUtility::fast_current_time())) { } else if (req.get_flag().is_read()) { if (OB_FAIL(device_handle_->pread(req.fd_, io_offset, req.io_result_->size_, req.calc_io_buf(), io_size))) { LOG_WARN("pread failed", K(ret), K(req)); @@ -2470,6 +2486,9 @@ int ObSyncIOChannel::do_sync_io(ObIORequest &req) ret = OB_NOT_SUPPORTED; LOG_WARN("not supported io mode", K(ret), K(req)); } + if (OB_NOT_NULL(req.io_result_)) { + req.io_result_->time_log_.return_ts_ = ObTimeUtility::fast_current_time(); + } if (OB_SUCC(ret)) { req.io_result_->complete_size_ = static_cast(io_size); if (!req.is_canceled() && req.can_callback()) { @@ -2759,7 +2778,9 @@ int ObIORunner::push(ObIORequest &req) LOG_WARN("Fail to enqueue callback", K(ret)); req.dec_ref("cb_dec"); // ref for callback queue } else { - req.time_log_.callback_enqueue_ts_ = ObTimeUtility::fast_current_time(); + if (OB_NOT_NULL(req.io_result_)) { + req.io_result_->time_log_.callback_enqueue_ts_ = ObTimeUtility::fast_current_time(); + } // the request has been pushed in queue, not set return code anymore int tmp_ret = OB_SUCCESS; ObThreadCondGuard guard(cond_);