Under concurrent conditions, requests from multiple tenants may hit the same sender
This commit is contained in:
@ -1370,6 +1370,7 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
|
|||||||
} else if (r_heap_.empty()){
|
} else if (r_heap_.empty()){
|
||||||
ret = OB_ENTRY_NOT_EXIST;
|
ret = OB_ENTRY_NOT_EXIST;
|
||||||
} else {
|
} else {
|
||||||
|
ObTimeGuard time_guard("pop_phyqueue", 100000); //100ms
|
||||||
ObPhyQueue *tmp_phy_queue = r_heap_.top();
|
ObPhyQueue *tmp_phy_queue = r_heap_.top();
|
||||||
if (OB_ISNULL(tmp_phy_queue)) {
|
if (OB_ISNULL(tmp_phy_queue)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -1380,6 +1381,7 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
|
|||||||
//R schedule
|
//R schedule
|
||||||
if(OB_FAIL(remove_from_heap(tmp_phy_queue))) {
|
if(OB_FAIL(remove_from_heap(tmp_phy_queue))) {
|
||||||
LOG_WARN("remove phy queue from heap failed(R schedule)", K(ret));
|
LOG_WARN("remove phy queue from heap failed(R schedule)", K(ret));
|
||||||
|
} else if (FALSE_IT(time_guard.click("R_leave_heap"))) {
|
||||||
} else {
|
} else {
|
||||||
req = tmp_phy_queue->req_list_.remove_first();
|
req = tmp_phy_queue->req_list_.remove_first();
|
||||||
if (OB_ISNULL(req)) {
|
if (OB_ISNULL(req)) {
|
||||||
@ -1397,10 +1399,12 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
|
|||||||
LOG_WARN("get null next_req", KP(next_req));
|
LOG_WARN("get null next_req", KP(next_req));
|
||||||
} else if (OB_FAIL(io_clock->calc_phyqueue_clock(tmp_phy_queue, *next_req))) {
|
} else if (OB_FAIL(io_clock->calc_phyqueue_clock(tmp_phy_queue, *next_req))) {
|
||||||
LOG_WARN("calc phyqueue clock failed", K(ret), KPC(next_req));
|
LOG_WARN("calc phyqueue clock failed", K(ret), KPC(next_req));
|
||||||
|
} else if (FALSE_IT(time_guard.click("R_calc_clock"))) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
int tmp_ret = push_phyqueue(tmp_phy_queue);
|
int tmp_ret = push_phyqueue(tmp_phy_queue);
|
||||||
|
time_guard.click("R_into_heap");
|
||||||
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
|
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
|
||||||
LOG_WARN("re_into heap failed(R schedule)", K(tmp_ret));
|
LOG_WARN("re_into heap failed(R schedule)", K(tmp_ret));
|
||||||
abort();
|
abort();
|
||||||
@ -1412,6 +1416,8 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
|
|||||||
if (OB_EAGAIN != ret) {
|
if (OB_EAGAIN != ret) {
|
||||||
LOG_WARN("pop with ready queue failed", K(ret));
|
LOG_WARN("pop with ready queue failed", K(ret));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
time_guard.click("P_schedule");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -985,6 +985,7 @@ int ObIOSender::enqueue_request(ObIORequest &req)
|
|||||||
if (OB_FAIL(cond_guard.get_ret())) {
|
if (OB_FAIL(cond_guard.get_ret())) {
|
||||||
LOG_ERROR("guard queue condition failed", K(ret));
|
LOG_ERROR("guard queue condition failed", K(ret));
|
||||||
} else {
|
} else {
|
||||||
|
req.sender_index_ = sender_index_;
|
||||||
ObIOGroupQueues *io_group_queues = nullptr;
|
ObIOGroupQueues *io_group_queues = nullptr;
|
||||||
if (OB_FAIL(tenant_groups_map_.get_refactored(tmp_req->io_info_.tenant_id_, io_group_queues))) {
|
if (OB_FAIL(tenant_groups_map_.get_refactored(tmp_req->io_info_.tenant_id_, io_group_queues))) {
|
||||||
LOG_WARN("get_refactored tenant_map failed", K(ret), K(req));
|
LOG_WARN("get_refactored tenant_map failed", K(ret), K(req));
|
||||||
@ -1253,7 +1254,7 @@ int ObIOSender::notify()
|
|||||||
|
|
||||||
int64_t ObIOSender::get_queue_count() const
|
int64_t ObIOSender::get_queue_count() const
|
||||||
{
|
{
|
||||||
return OB_ISNULL(io_queue_) ? 0 : sender_req_count_;
|
return OB_ISNULL(io_queue_) ? 0 : ATOMIC_LOAD(&sender_req_count_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObIOSender::get_sender_info(int64_t &reservation_ts,
|
int ObIOSender::get_sender_info(int64_t &reservation_ts,
|
||||||
@ -1381,11 +1382,13 @@ int ObIOSender::submit(ObIORequest &req)
|
|||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObDeviceChannel *device_channel = nullptr;
|
ObDeviceChannel *device_channel = nullptr;
|
||||||
|
ObTimeGuard time_guard("submit_req", 100000); //100ms
|
||||||
if (OB_UNLIKELY(stop_submit_)) {
|
if (OB_UNLIKELY(stop_submit_)) {
|
||||||
ret = OB_STATE_NOT_MATCH;
|
ret = OB_STATE_NOT_MATCH;
|
||||||
LOG_WARN("sender stop submit", K(ret), K(stop_submit_));
|
LOG_WARN("sender stop submit", K(ret), K(stop_submit_));
|
||||||
} else if (OB_FAIL(req.prepare())) {
|
} else if (OB_FAIL(req.prepare())) {
|
||||||
LOG_WARN("prepare io request failed", K(ret), K(req));
|
LOG_WARN("prepare io request failed", K(ret), K(req));
|
||||||
|
} else if (FALSE_IT(time_guard.click("prepare_req"))) {
|
||||||
} else if (OB_FAIL(OB_IO_MANAGER.get_device_channel(req.io_info_.fd_.device_handle_, device_channel))) {
|
} else if (OB_FAIL(OB_IO_MANAGER.get_device_channel(req.io_info_.fd_.device_handle_, device_channel))) {
|
||||||
LOG_WARN("get device channel failed", K(ret), K(req));
|
LOG_WARN("get device channel failed", K(ret), K(req));
|
||||||
} else {
|
} else {
|
||||||
@ -1399,6 +1402,8 @@ int ObIOSender::submit(ObIORequest &req)
|
|||||||
if (OB_EAGAIN != ret) {
|
if (OB_EAGAIN != ret) {
|
||||||
LOG_WARN("submit io request failed", K(ret), K(req), KPC(device_channel));
|
LOG_WARN("submit io request failed", K(ret), K(req), KPC(device_channel));
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
time_guard.click("device_submit");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -1518,7 +1523,9 @@ int ObIOScheduler::schedule_request(ObTenantIOClock &io_clock, ObIORequest &req)
|
|||||||
// push the requeust into sender queue, balance channel queue count by random twice
|
// push the requeust into sender queue, balance channel queue count by random twice
|
||||||
const int64_t idx1 = ObRandom::rand(0, senders_.count() - 1);
|
const int64_t idx1 = ObRandom::rand(0, senders_.count() - 1);
|
||||||
const int64_t idx2 = ObRandom::rand(0, senders_.count() - 1);
|
const int64_t idx2 = ObRandom::rand(0, senders_.count() - 1);
|
||||||
const int64_t sender_idx = senders_.at(idx1)->sender_req_count_ < senders_.at(idx2)->sender_req_count_ ? idx1 : idx2;
|
const int64_t count1 = senders_.at(idx1)->get_queue_count();
|
||||||
|
const int64_t count2 = senders_.at(idx2)->get_queue_count();
|
||||||
|
const int64_t sender_idx = count1 < count2 ? idx1 : idx2;
|
||||||
ObIOSender *sender = senders_.at(sender_idx);
|
ObIOSender *sender = senders_.at(sender_idx);
|
||||||
if (req.io_info_.fd_.device_handle_->media_id_ != schedule_media_id_) {
|
if (req.io_info_.fd_.device_handle_->media_id_ != schedule_media_id_) {
|
||||||
// direct submit
|
// direct submit
|
||||||
@ -1813,7 +1820,7 @@ int ObAsyncIOChannel::submit(ObIORequest &req)
|
|||||||
}
|
}
|
||||||
} else if (device_channel_->used_io_depth_ > device_channel_->max_io_depth_) {
|
} else if (device_channel_->used_io_depth_ > device_channel_->max_io_depth_) {
|
||||||
ret = OB_EAGAIN;
|
ret = OB_EAGAIN;
|
||||||
LOG_DEBUG("reach max io depth", K(ret), K(device_channel_->used_io_depth_), K(device_channel_->max_io_depth_));
|
LOG_INFO("reach max io depth", K(ret), K(device_channel_->used_io_depth_), K(device_channel_->max_io_depth_));
|
||||||
} else {
|
} else {
|
||||||
ATOMIC_INC(&submit_count_);
|
ATOMIC_INC(&submit_count_);
|
||||||
ATOMIC_FAA(&device_channel_->used_io_depth_, get_io_depth(req.io_size_));
|
ATOMIC_FAA(&device_channel_->used_io_depth_, get_io_depth(req.io_size_));
|
||||||
|
|||||||
Reference in New Issue
Block a user