[master] mv ObIOTimeLog from ObIORequest to ObIOResult

This commit is contained in:
zhjc1124 2024-06-17 09:31:37 +00:00 committed by ob-robot
parent 7d2ca20fd7
commit 106d246f51
3 changed files with 48 additions and 22 deletions

View File

@ -430,8 +430,9 @@ ObIOResult::ObIOResult()
user_data_buf_(nullptr),
io_callback_(nullptr),
flag_(),
ret_code_(),
cond_()
cond_(),
time_log_(),
ret_code_()
{
}
@ -509,6 +510,7 @@ void ObIOResult::reset()
io_callback_ = nullptr;
flag_.reset();
ret_code_.reset();
time_log_.reset();
tenant_io_mgr_.reset();
//do not destroy thread_cond
is_inited_ = false;
@ -532,6 +534,7 @@ void ObIOResult::destroy()
io_callback_ = nullptr;
flag_.reset();
ret_code_.reset();
time_log_.reset();
cond_.destroy();
tenant_io_mgr_.reset();
is_inited_ = false;
@ -739,7 +742,6 @@ ObIORequest::ObIORequest()
tenant_id_(OB_INVALID_TENANT_ID),
tenant_io_mgr_(),
fd_(),
time_log_(),
trace_id_()
{
@ -830,7 +832,6 @@ void ObIORequest::reset() //only for test, not dec resut_ref
free_io_buffer();
ref_cnt_ = 0;
trace_id_.reset();
time_log_.reset();
io_result_ = nullptr;
fd_.reset();
tenant_io_mgr_.reset();
@ -849,7 +850,6 @@ void ObIORequest::destroy()
free_io_buffer();
ref_cnt_ = 0;
trace_id_.reset();
time_log_.reset();
if (nullptr != io_result_) {
io_result_->dec_ref("request");
io_result_ = nullptr;
@ -1966,7 +1966,9 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
ret = OB_ERR_UNEXPECTED;
LOG_WARN("request is null", K(ret), KP(req));
} else {
req->io_result_ == nullptr ? : req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
if (OB_NOT_NULL(req->io_result_)) {
req->io_result_->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
}
if (tmp_phy_queue->req_list_.is_empty()) {
tmp_phy_queue->reset_time_info();
tmp_phy_queue->last_empty_ts_ = ObTimeUtility::fast_current_time();
@ -2093,7 +2095,9 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
ret = OB_ERR_UNEXPECTED;
LOG_WARN("req is null", K(ret), KP(req));
} else {
req->io_result_ == nullptr ? : req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
if (OB_NOT_NULL(req->io_result_)) {
req->io_result_->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
}
LOG_DEBUG("req pop from phy queue succcess(P schedule)", KP(req), K(iter_count), "time_cost", ObTimeUtility::fast_current_time() - current_ts, K(ready_heap_.count()), K(current_ts));
if (tmp_phy_queue->req_list_.is_empty()) {
tmp_phy_queue->reset_time_info();

View File

@ -320,7 +320,7 @@ public:
TO_STRING_KV(K(is_inited_), K(is_finished_), K(is_canceled_), K(has_estimated_), K(complete_size_), K(offset_), K(size_),
K(timeout_us_), K(result_ref_cnt_), K(out_ref_cnt_), K(flag_), K(ret_code_), K(tenant_io_mgr_),
KP(user_data_buf_), KP(buf_), KP(io_callback_), K(begin_ts_), K(end_ts_));
KP(user_data_buf_), KP(buf_), KP(io_callback_), K(begin_ts_), K(end_ts_), K_(time_log));
DISALLOW_COPY_AND_ASSIGN(ObIOResult);
private:
@ -348,8 +348,10 @@ private:
char *user_data_buf_; //actual data buf without cb, allocated by thpe calling layer
ObIOCallback *io_callback_;
ObIOFlag flag_;
ObIORetCode ret_code_;
ObThreadCond cond_;
public:
ObIOTimeLog time_log_;
ObIORetCode ret_code_;
};
class ObIORequest : public common::ObDLinkBase<ObIORequest>
@ -387,7 +389,7 @@ public:
void dec_ref(const char *msg = nullptr);
TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(control_block_), K(ref_cnt_), KP(raw_buf_), K(fd_),
K(trace_id_), K(retry_count_), K(tenant_io_mgr_), K(time_log_), KPC(io_result_));
K(trace_id_), K(retry_count_), K(tenant_io_mgr_), KPC(io_result_));
private:
friend class ObIOResult;
friend class ObIOSender;
@ -414,7 +416,6 @@ private:
uint64_t tenant_id_;
ObRefHolder<ObTenantIOManager> tenant_io_mgr_;
ObIOFd fd_;
ObIOTimeLog time_log_;
ObCurTraceId::TraceId trace_id_;
};

View File

@ -532,8 +532,11 @@ void ObIOUsage::accumulate(ObIOResult &result, ObIORequest &request)
int32_t io_offset = 0;
int64_t io_size = 0;
result.calc_io_offset_and_size(io_size, io_offset);
if (request.time_log_.return_ts_ > 0) {
const int64_t device_delay = get_io_interval(request.time_log_.return_ts_, request.time_log_.submit_ts_);
if (OB_NOT_NULL(request.io_result_)
&& request.io_result_->time_log_.return_ts_ > 0
&& request.io_result_->ret_code_.io_ret_ == 0) {
const int64_t device_delay = get_io_interval(request.io_result_->time_log_.return_ts_,
request.io_result_->time_log_.submit_ts_);
io_stats_.at(result.get_io_usage_index()).at(static_cast<int>(result.get_mode()))
.accumulate(1, io_size, device_delay);
}
@ -678,12 +681,12 @@ void ObSysIOUsage::accumulate(ObIOResult &result, ObIORequest &request)
{
if (OB_UNLIKELY(!request.is_sys_module())) {
// ignore
} else if (request.time_log_.return_ts_ > 0) {
} else if (request.io_result_->time_log_.return_ts_ > 0) {
int32_t io_offset = 0;
int64_t io_size = 0;
result.calc_io_offset_and_size(io_size, io_offset);
const uint64_t idx = result.get_sys_module_id() - SYS_RESOURCE_GROUP_START_ID;
const int64_t device_delay = get_io_interval(request.time_log_.return_ts_, request.time_log_.submit_ts_);
const int64_t device_delay = get_io_interval(request.io_result_->time_log_.return_ts_, request.io_result_->time_log_.submit_ts_);
io_stats_.at(idx).at(static_cast<int>(result.get_mode()))
.accumulate(1, io_size, device_delay);
}
@ -1200,7 +1203,9 @@ int ObIOSender::enqueue_request(ObIORequest &req)
LOG_WARN("push new req into phy queue failed", K(ret));
} else {
ATOMIC_INC(&sender_req_count_);
req.time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time();
if (OB_NOT_NULL(req.io_result_)) {
req.io_result_->time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time();
}
//calc ts_
if (OB_NOT_NULL(req.tenant_io_mgr_.get_ptr())) {
ObTenantIOClock *io_clock = static_cast<ObTenantIOClock *>(req.tenant_io_mgr_.get_ptr()->get_io_clock());
@ -1231,7 +1236,9 @@ int ObIOSender::enqueue_request(ObIORequest &req)
req.dec_ref("phyqueue_dec"); //ref for phy_queue
} else {
ATOMIC_INC(&sender_req_count_);
req.time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time();
if (OB_NOT_NULL(req.io_result_)) {
req.io_result_->time_log_.enqueue_ts_ = ObTimeUtility::fast_current_time();
}
}
}
if (OB_SUCC(ret)) {
@ -2081,7 +2088,9 @@ int ObAsyncIOChannel::submit(ObIORequest &req)
int64_t io_size = 0;
req.calc_io_offset_and_size(io_size, io_offset);
ATOMIC_FAA(&device_channel_->used_io_depth_, get_io_depth(io_size));
req.time_log_.submit_ts_ = ObTimeUtility::current_time();
if (OB_NOT_NULL(req.io_result_)) {
req.io_result_->time_log_.submit_ts_ = ObTimeUtility::fast_current_time();
}
req.inc_ref("os_inc"); // ref for file system
if (OB_FAIL(device_handle_->io_submit(io_context_, req.control_block_))) {
ATOMIC_DEC(&submit_count_);
@ -2100,7 +2109,10 @@ void ObAsyncIOChannel::cancel(ObIORequest &req)
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (0 != req.time_log_.submit_ts_ && 0 == req.time_log_.return_ts_) {
} else if (OB_ISNULL(req.io_result_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("io result is null", KR(ret), K(req));
} else if (0 != req.io_result_->time_log_.submit_ts_ && 0 == req.io_result_->time_log_.return_ts_) {
// Note: here if ob_io_cancel failed (possibly due to kernel not supporting io_cancel),
// neither we or the get_events thread would call control.callback_->process(),
// as we previously set need_callback to false.
@ -2145,7 +2157,9 @@ void ObAsyncIOChannel::get_events()
} else {
RequestHolder holder(req);
req->dec_ref("os_dec"); // ref for file system
req->time_log_.return_ts_ = io_return_time;
if (OB_NOT_NULL(req->io_result_)) {
req->io_result_->time_log_.return_ts_ = ObTimeUtility::fast_current_time();
}
int64_t io_offset = 0;
int64_t io_size = 0;
req->calc_io_offset_and_size(io_size, io_offset);
@ -2451,13 +2465,15 @@ int ObSyncIOChannel::do_sync_io(ObIORequest &req)
{
int ret = OB_SUCCESS;
int64_t io_size = 0;
int64_t io_offset = static_cast<int64_t>(req.io_result_->offset_);
int64_t io_offset = 0;
if (OB_ISNULL(device_handle_)) {
ret = OB_ERR_SYS;
LOG_WARN("device handle is null", K(ret));
} else if (OB_ISNULL(req.io_result_)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("io result is null", K(ret));
} else if (FALSE_IT(io_offset = static_cast<int64_t>(req.io_result_->offset_))) {
} else if (FALSE_IT(req.io_result_->time_log_.submit_ts_ = ObTimeUtility::fast_current_time())) {
} else if (req.get_flag().is_read()) {
if (OB_FAIL(device_handle_->pread(req.fd_, io_offset, req.io_result_->size_, req.calc_io_buf(), io_size))) {
LOG_WARN("pread failed", K(ret), K(req));
@ -2470,6 +2486,9 @@ int ObSyncIOChannel::do_sync_io(ObIORequest &req)
ret = OB_NOT_SUPPORTED;
LOG_WARN("not supported io mode", K(ret), K(req));
}
if (OB_NOT_NULL(req.io_result_)) {
req.io_result_->time_log_.return_ts_ = ObTimeUtility::fast_current_time();
}
if (OB_SUCC(ret)) {
req.io_result_->complete_size_ = static_cast<int32_t>(io_size);
if (!req.is_canceled() && req.can_callback()) {
@ -2759,7 +2778,9 @@ int ObIORunner::push(ObIORequest &req)
LOG_WARN("Fail to enqueue callback", K(ret));
req.dec_ref("cb_dec"); // ref for callback queue
} else {
req.time_log_.callback_enqueue_ts_ = ObTimeUtility::fast_current_time();
if (OB_NOT_NULL(req.io_result_)) {
req.io_result_->time_log_.callback_enqueue_ts_ = ObTimeUtility::fast_current_time();
}
// the request has been pushed in queue, not set return code anymore
int tmp_ret = OB_SUCCESS;
ObThreadCondGuard guard(cond_);