add log for io_sender

This commit is contained in:
renju96
2023-02-07 14:17:24 +08:00
committed by ob-robot
parent be78b2f3d8
commit fbc56d967c
4 changed files with 111 additions and 41 deletions

View File

@ -300,6 +300,7 @@ ObIORequest::ObIORequest()
has_estimated_(false), has_estimated_(false),
io_info_(), io_info_(),
deadline_ts_(0), deadline_ts_(0),
sender_index_(0),
control_block_(nullptr), control_block_(nullptr),
raw_buf_(nullptr), raw_buf_(nullptr),
io_buf_(nullptr), io_buf_(nullptr),
@ -375,6 +376,7 @@ void ObIORequest::destroy()
is_canceled_ = false; is_canceled_ = false;
has_estimated_ = false; has_estimated_ = false;
deadline_ts_ = 0; deadline_ts_ = 0;
sender_index_ = 0;
if (nullptr != control_block_ && nullptr != io_info_.fd_.device_handle_) { if (nullptr != control_block_ && nullptr != io_info_.fd_.device_handle_) {
io_info_.fd_.device_handle_->free_iocb(control_block_); io_info_.fd_.device_handle_->free_iocb(control_block_);
control_block_ = nullptr; control_block_ = nullptr;
@ -1277,7 +1279,7 @@ void ObAtomIOClock::reset()
ObMClockQueue::ObMClockQueue() ObMClockQueue::ObMClockQueue()
: is_inited_(false), : is_inited_(false),
r_heap_(r_cmp_), r_heap_(r_cmp_),
cl_heap_(cl_cmp_), gl_heap_(gl_cmp_),
tl_heap_(tl_cmp_), tl_heap_(tl_cmp_),
ready_heap_(p_cmp_) ready_heap_(p_cmp_)
{ {
@ -1309,6 +1311,30 @@ void ObMClockQueue::destroy()
is_inited_ = false; is_inited_ = false;
} }
int ObMClockQueue::get_time_info(int64_t &reservation_ts,
int64_t &group_limitation_ts,
int64_t &tenant_limitation_ts,
int64_t &proportion_ts)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_ || r_heap_.empty())) {
ret = OB_NOT_INIT;
LOG_WARN("not init yet", K(ret), K(is_inited_));
} else {
ObPhyQueue *r_phy_queue = r_heap_.top();
if (OB_ISNULL(r_phy_queue)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_queue is null", K(ret));
} else {
reservation_ts = r_phy_queue->reservation_ts_;
group_limitation_ts = gl_heap_.empty() ? INT64_MAX : gl_heap_.top()->group_limitation_ts_;
tenant_limitation_ts = tl_heap_.empty() ? INT64_MAX : tl_heap_.top()->tenant_limitation_ts_;
proportion_ts = ready_heap_.empty() ? INT64_MAX : ready_heap_.top()->proportion_ts_;
}
}
return ret;
}
int ObMClockQueue::push_phyqueue(ObPhyQueue *phy_queue) int ObMClockQueue::push_phyqueue(ObPhyQueue *phy_queue)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -1321,7 +1347,7 @@ int ObMClockQueue::push_phyqueue(ObPhyQueue *phy_queue)
LOG_WARN("phy_queue is null", K(ret), KP(phy_queue)); LOG_WARN("phy_queue is null", K(ret), KP(phy_queue));
} else if (OB_FAIL(r_heap_.push(phy_queue))) { } else if (OB_FAIL(r_heap_.push(phy_queue))) {
LOG_WARN("push r heap failed", K(ret)); LOG_WARN("push r heap failed", K(ret));
} else if (OB_FAIL(cl_heap_.push(phy_queue))) { } else if (OB_FAIL(gl_heap_.push(phy_queue))) {
LOG_WARN("push cl heap failed", K(ret)); LOG_WARN("push cl heap failed", K(ret));
int tmp_ret = r_heap_.remove(phy_queue); int tmp_ret = r_heap_.remove(phy_queue);
if (OB_SUCCESS != tmp_ret) { if (OB_SUCCESS != tmp_ret) {
@ -1401,7 +1427,7 @@ int ObMClockQueue::remove_from_heap(ObPhyQueue *phy_queue)
} else if (OB_FAIL(r_heap_.remove(phy_queue))) { } else if (OB_FAIL(r_heap_.remove(phy_queue))) {
LOG_WARN("remove phy queue from r heap failed", K(ret)); LOG_WARN("remove phy queue from r heap failed", K(ret));
} else if (!phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) { } else if (!phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) {
if (OB_FAIL(cl_heap_.remove(phy_queue))) { if (OB_FAIL(gl_heap_.remove(phy_queue))) {
LOG_WARN("remove phy queue from cl heap failed", K(ret)); LOG_WARN("remove phy queue from cl heap failed", K(ret));
} }
} else if (phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) { } else if (phy_queue->is_group_ready_ && !phy_queue->is_tenant_ready_) {
@ -1424,24 +1450,24 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
int64_t iter_count = 0; int64_t iter_count = 0;
ObPhyQueue *tmp_phy_queue = nullptr; ObPhyQueue *tmp_phy_queue = nullptr;
req = nullptr; req = nullptr;
while (OB_SUCC(ret) && !cl_heap_.empty() && !cl_heap_.top()->req_list_.is_empty()) { while (OB_SUCC(ret) && !gl_heap_.empty() && !gl_heap_.top()->req_list_.is_empty()) {
tmp_phy_queue = cl_heap_.top(); tmp_phy_queue = gl_heap_.top();
deadline_ts = 0 == iter_count ? tmp_phy_queue->tenant_limitation_ts_ : deadline_ts; deadline_ts = 0 == iter_count ? tmp_phy_queue->tenant_limitation_ts_ : deadline_ts;
++iter_count; ++iter_count;
if (tmp_phy_queue->group_limitation_ts_ > current_ts) { if (tmp_phy_queue->group_limitation_ts_ > current_ts) {
break; break;
} else if (OB_FAIL(cl_heap_.pop())) { } else if (OB_FAIL(gl_heap_.pop())) {
LOG_WARN("remove PhyQueue from c_limitation queue failed", K(ret)); LOG_WARN("remove PhyQueue from c_limitation queue failed", K(ret));
} else { } else {
tmp_phy_queue->is_group_ready_ = true; tmp_phy_queue->is_group_ready_ = true;
if (tmp_phy_queue->tenant_limitation_ts_ <= current_ts) { if (tmp_phy_queue->tenant_limitation_ts_ <= current_ts) {
tmp_phy_queue->is_tenant_ready_ = true; tmp_phy_queue->is_tenant_ready_ = true;
if (OB_FAIL(ready_heap_.push(tmp_phy_queue))) { if (OB_FAIL(ready_heap_.push(tmp_phy_queue))) {
LOG_WARN("push phy_queue from cl_heap to ready_heap failed", K(ret)); LOG_WARN("push phy_queue from gl_heap to ready_heap failed", K(ret));
} }
} else { } else {
if (OB_FAIL(tl_heap_.push(tmp_phy_queue))) { if (OB_FAIL(tl_heap_.push(tmp_phy_queue))) {
LOG_WARN("push phy_queue from cl_heap to tl_heap failed", K(ret)); LOG_WARN("push phy_queue from gl_heap to tl_heap failed", K(ret));
} }
} }
} }

View File

@ -220,7 +220,7 @@ public:
void inc_out_ref(); void inc_out_ref();
void dec_out_ref(); void dec_out_ref();
VIRTUAL_TO_STRING_KV(K(is_inited_), K(is_finished_), K(is_canceled_), K(has_estimated_), K(io_info_), K(deadline_ts_), VIRTUAL_TO_STRING_KV(K(is_inited_), K(is_finished_), K(is_canceled_), K(has_estimated_), K(io_info_), K(deadline_ts_),
KP(control_block_), KP(raw_buf_), KP(io_buf_), K(io_offset_), K(io_size_), K(complete_size_), K(sender_index_), KP(control_block_), KP(raw_buf_), KP(io_buf_), K(io_offset_), K(io_size_), K(complete_size_),
K(time_log_), KP(channel_), K(ref_cnt_), K(out_ref_cnt_), K(time_log_), KP(channel_), K(ref_cnt_), K(out_ref_cnt_),
K(trace_id_), K(ret_code_), K(retry_count_), K(callback_buf_size_), KP(copied_callback_), K(tenant_io_mgr_)); K(trace_id_), K(ret_code_), K(retry_count_), K(callback_buf_size_), KP(copied_callback_), K(tenant_io_mgr_));
private: private:
@ -232,6 +232,7 @@ public:
bool has_estimated_; bool has_estimated_;
ObIOInfo io_info_; ObIOInfo io_info_;
int64_t deadline_ts_; int64_t deadline_ts_;
int64_t sender_index_;
ObIOCB *control_block_; ObIOCB *control_block_;
void *raw_buf_; void *raw_buf_;
char *io_buf_; char *io_buf_;
@ -392,6 +393,10 @@ public:
virtual ~ObMClockQueue(); virtual ~ObMClockQueue();
virtual int init() override; virtual int init() override;
virtual void destroy() override; virtual void destroy() override;
int get_time_info(int64_t &reservation_ts,
int64_t &group_limitation_ts,
int64_t &tenant_limitation_ts,
int64_t &proportion_ts);
int push_phyqueue(ObPhyQueue *phy_queue); int push_phyqueue(ObPhyQueue *phy_queue);
int pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts); int pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts);
TO_STRING_KV(K(is_inited_)); TO_STRING_KV(K(is_inited_));
@ -410,11 +415,11 @@ private:
private: private:
bool is_inited_; bool is_inited_;
HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_> r_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_> r_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_> cl_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_> gl_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_> tl_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_> tl_cmp_;
HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_> p_cmp_; HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_> p_cmp_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_>, &ObPhyQueue::reservation_pos_> r_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::reservation_ts_>, &ObPhyQueue::reservation_pos_> r_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_>, &ObPhyQueue::group_limitation_pos_> cl_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::group_limitation_ts_>, &ObPhyQueue::group_limitation_pos_> gl_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_>, &ObPhyQueue::tenant_limitation_pos_> tl_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::tenant_limitation_ts_>, &ObPhyQueue::tenant_limitation_pos_> tl_heap_;
ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_>, &ObPhyQueue::proportion_pos_> ready_heap_; ObRemovableHeap<ObPhyQueue *, HeapCompare<ObPhyQueue, &ObPhyQueue::proportion_ts_>, &ObPhyQueue::proportion_pos_> ready_heap_;
}; };

View File

@ -688,6 +688,7 @@ void ObIOTuner::run1()
// print interval must <= 1s, for ensuring real_iops >= 1 in gv$ob_io_quota. // print interval must <= 1s, for ensuring real_iops >= 1 in gv$ob_io_quota.
if (REACH_TIME_INTERVAL(1000L * 1000L * 1L)) { if (REACH_TIME_INTERVAL(1000L * 1000L * 1L)) {
print_io_status(); print_io_status();
print_sender_status();
} }
ob_usleep(100 * 1000); // 100ms ob_usleep(100 * 1000); // 100ms
} }
@ -695,6 +696,29 @@ void ObIOTuner::run1()
} }
} }
void ObIOTuner::print_sender_status()
{
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < io_scheduler_.senders_.count(); ++i) {
ObIOSender *sender = io_scheduler_.senders_.at(i);
if (OB_ISNULL(sender)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("io sender is null", K(ret), K(i));
} else {
int64_t reservation_ts = 0;
int64_t group_limitation_ts = 0;
int64_t tenant_limitation_ts = 0;
int64_t proportion_ts = 0;
ret = sender->get_sender_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts);
if (OB_NOT_INIT != ret) {
LOG_INFO("[IO SENDER STATUS]", "send_index", sender->sender_index_, "req_count", sender->get_queue_count(),
K(reservation_ts), K(group_limitation_ts), K(tenant_limitation_ts), K(proportion_ts));
}
}
}
}
void ObIOTuner::print_io_status() void ObIOTuner::print_io_status()
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -702,23 +726,6 @@ void ObIOTuner::print_io_status()
if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) { if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) {
LOG_WARN("get tenant id failed", K(ret)); LOG_WARN("get tenant id failed", K(ret));
} else if (tenant_ids.count() > 0) { } else if (tenant_ids.count() > 0) {
ObArray<int64_t> queue_count_array;
if (OB_FAIL(queue_count_array.reserve(io_scheduler_.senders_.count()))) {
LOG_WARN("reserve queue count array failed", K(ret), K(io_scheduler_.senders_.count()));
}
for (int64_t i = 0; OB_SUCC(ret) && i < io_scheduler_.senders_.count(); ++i) {
ObIOSender *sender = io_scheduler_.senders_.at(i);
if (OB_ISNULL(sender)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("io sender is null", K(ret), K(i));
} else if (OB_FAIL(queue_count_array.push_back(sender->get_queue_count()))) {
LOG_WARN("push back queue count failed", K(ret));
}
}
if (OB_SUCC(ret)) {
LOG_INFO("[IO STATUS]", K(tenant_ids), "send_thread_count", io_scheduler_.senders_.count(), "send_queues", queue_count_array);
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i); const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder; ObRefHolder<ObTenantIOManager> tenant_holder;
@ -732,6 +739,7 @@ void ObIOTuner::print_io_status()
tenant_holder.get_ptr()->print_io_status(); tenant_holder.get_ptr()->print_io_status();
} }
} }
}
} }
/****************** ObIOGroupQueues **********************/ /****************** ObIOGroupQueues **********************/
@ -815,6 +823,7 @@ ObSenderInfo::~ObSenderInfo()
/****************** IOScheduleQueue **********************/ /****************** IOScheduleQueue **********************/
ObIOSender::ObIOSender(ObIAllocator &allocator) ObIOSender::ObIOSender(ObIAllocator &allocator)
: sender_req_count_(0), : sender_req_count_(0),
sender_index_(0),
tg_id_(-1), tg_id_(-1),
is_inited_(false), is_inited_(false),
stop_submit_(false), stop_submit_(false),
@ -830,7 +839,7 @@ ObIOSender::~ObIOSender()
destroy(); destroy();
} }
int ObIOSender::init() int ObIOSender::init(const int64_t sender_index)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) { if (OB_UNLIKELY(is_inited_)) {
@ -849,6 +858,7 @@ int ObIOSender::init()
} else { } else {
is_inited_ = true; is_inited_ = true;
sender_req_count_ = 0; sender_req_count_ = 0;
sender_index_ = sender_index;
LOG_INFO("io sender init succ", KCSTRING(lbt())); LOG_INFO("io sender init succ", KCSTRING(lbt()));
} }
@ -910,6 +920,7 @@ void ObIOSender::destroy()
is_inited_ = false; is_inited_ = false;
stop_submit_ = false; stop_submit_ = false;
sender_req_count_ = 0; sender_req_count_ = 0;
sender_index_ = 0;
LOG_INFO("io sender destroyed", KCSTRING(lbt())); LOG_INFO("io sender destroyed", KCSTRING(lbt()));
} }
@ -1245,6 +1256,26 @@ int64_t ObIOSender::get_queue_count() const
return OB_ISNULL(io_queue_) ? 0 : sender_req_count_; return OB_ISNULL(io_queue_) ? 0 : sender_req_count_;
} }
int ObIOSender::get_sender_info(int64_t &reservation_ts,
int64_t &group_limitation_ts,
int64_t &tenant_limitation_ts,
int64_t &proportion_ts)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret), K(is_inited_));
} else {
ObThreadCondGuard cond_guard(queue_cond_);
if (OB_FAIL(cond_guard.get_ret())) {
LOG_ERROR("guard queue condition failed", K(ret));
} else {
ret = io_queue_->get_time_info(reservation_ts, group_limitation_ts, tenant_limitation_ts, proportion_ts);
}
}
return ret;
}
int ObIOSender::get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info) int ObIOSender::get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info)
{ {
int ret = OB_SUCCESS; int ret = OB_SUCCESS;
@ -1403,11 +1434,12 @@ int ObIOScheduler::init(const int64_t queue_count, const int64_t schedule_media_
for (int64_t i = 0; OB_SUCC(ret) && i < queue_count; ++i) { for (int64_t i = 0; OB_SUCC(ret) && i < queue_count; ++i) {
void *buf = nullptr; void *buf = nullptr;
ObIOSender *tmp_sender = nullptr; ObIOSender *tmp_sender = nullptr;
int64_t sender_index = i + 1;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObIOSender)))) { if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObIOSender)))) {
ret = OB_ALLOCATE_MEMORY_FAILED; ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret)); LOG_WARN("allocate memory failed", K(ret));
} else if (FALSE_IT(tmp_sender = new (buf) ObIOSender(allocator_))) { } else if (FALSE_IT(tmp_sender = new (buf) ObIOSender(allocator_))) {
} else if (OB_FAIL(tmp_sender->init())) { } else if (OB_FAIL(tmp_sender->init(sender_index))) {
LOG_WARN("init io sender failed", K(ret), K(i), K(*tmp_sender)); LOG_WARN("init io sender failed", K(ret), K(i), K(*tmp_sender));
} else if (OB_FAIL(senders_.push_back(tmp_sender))) { } else if (OB_FAIL(senders_.push_back(tmp_sender))) {
LOG_WARN("push back io sender failed", K(ret), K(i), K(*tmp_sender)); LOG_WARN("push back io sender failed", K(ret), K(i), K(*tmp_sender));
@ -2902,6 +2934,7 @@ void ObIOFaultDetector::set_device_warning()
{ {
last_device_warning_ts_ = ObTimeUtility::fast_current_time(); last_device_warning_ts_ = ObTimeUtility::fast_current_time();
is_device_warning_ = true; is_device_warning_ = true;
LOG_WARN_RET(OB_IO_ERROR, "disk maybe too slow");
} }
// set disk error and record error_ts // set disk error and record error_ts

View File

@ -184,6 +184,7 @@ public:
virtual void run1() override; virtual void run1() override;
private: private:
void print_sender_status();
void print_io_status(); void print_io_status();
private: private:
bool is_inited_; bool is_inited_;
@ -224,7 +225,7 @@ class ObIOSender : public lib::TGRunnable
public: public:
ObIOSender(ObIAllocator &allocator); ObIOSender(ObIAllocator &allocator);
virtual ~ObIOSender(); virtual ~ObIOSender();
int init(); int init(const int64_t sender_index);
void stop(); void stop();
void wait(); void wait();
void destroy(); void destroy();
@ -243,13 +244,18 @@ public:
int stop_phy_queue(const uint64_t tenant_id, const uint64_t index); int stop_phy_queue(const uint64_t tenant_id, const uint64_t index);
int notify(); int notify();
int64_t get_queue_count() const; int64_t get_queue_count() const;
int get_sender_info(int64_t &reservation_ts,
int64_t &group_limitation_ts,
int64_t &tenant_limitation_ts,
int64_t &proportion_ts);
int get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info); int get_sender_status(const uint64_t tenant_id, const uint64_t index, ObSenderInfo &sender_info);
TO_STRING_KV(K(is_inited_), K(stop_submit_), KPC(io_queue_), K(tg_id_)); TO_STRING_KV(K(is_inited_), K(stop_submit_), KPC(io_queue_), K(tg_id_), K(sender_index_));
//private: //private:
void pop_and_submit(); void pop_and_submit();
int64_t calc_wait_timeout(const int64_t queue_deadline); int64_t calc_wait_timeout(const int64_t queue_deadline);
int submit(ObIORequest &req); int submit(ObIORequest &req);
int64_t sender_req_count_; int64_t sender_req_count_;
int64_t sender_index_;
int tg_id_; // thread group id int tg_id_; // thread group id
bool is_inited_; bool is_inited_;
bool stop_submit_; bool stop_submit_;