Modify the fixed-length queue to a linked list

This commit is contained in:
obdev
2022-11-03 05:41:02 +00:00
committed by wangzelin.wzl
parent 33672fdd50
commit 4fc4c38ef4
5 changed files with 96 additions and 74 deletions

View File

@ -709,7 +709,6 @@ void ObIORequest::dec_out_ref()
ObPhyQueue::ObPhyQueue()
: is_inited_(false),
phy_queue_(),
reservation_ts_(INT_MAX64),
category_limitation_ts_(INT_MAX64),
tenant_limitation_ts_(INT_MAX64),
@ -720,7 +719,8 @@ ObPhyQueue::ObPhyQueue()
reservation_pos_(-1),
category_limitation_pos_(-1),
tenant_limitation_pos_(-1),
proportion_pos_(-1)
proportion_pos_(-1),
req_list_()
{
}
@ -736,8 +736,6 @@ int ObPhyQueue::init(const int index)
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("phy queue init twice", K(ret), K(is_inited_));
} else if (OB_FAIL(phy_queue_.init(SINGLE_QUEUE_DEPTH))) {
LOG_WARN("phy queue init failed", K(ret));
} else if (index < 0 || index > static_cast<int>(ObIOCategory::MAX_CATEGORY)){
ret = OB_INVALID_ARGUMENT;
LOG_WARN("index out of boundary", K(ret), K(index));
@ -754,7 +752,6 @@ int ObPhyQueue::init(const int index)
void ObPhyQueue::destroy()
{
is_inited_ = false;
phy_queue_.destroy();
reservation_ts_ = INT_MAX64;
category_limitation_ts_ = INT_MAX64;
tenant_limitation_ts_ = INT_MAX64;
@ -1293,32 +1290,29 @@ int ObMClockQueue::pop_phyqueue(ObIORequest *&req, int64_t &deadline_ts)
if (OB_ISNULL(tmp_phy_queue)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_queue is null", K(ret), KP(tmp_phy_queue));
} else if (tmp_phy_queue->phy_queue_.get_total() <= 0) {
} else if (tmp_phy_queue->req_list_.is_empty()) {
ret = OB_ENTRY_NOT_EXIST;
} else if (tmp_phy_queue->reservation_ts_ <= current_ts) {
//R schedule
if(OB_FAIL(remove_from_heap(tmp_phy_queue))) {
LOG_WARN("remove phy queue from heap failed(R schedule)", K(ret));
} else {
if (OB_FAIL(tmp_phy_queue->phy_queue_.pop(req))) {
LOG_WARN("pop req from r_heap failed", K(ret));
} else if (OB_ISNULL(req)) {
req = tmp_phy_queue->req_list_.remove_first();
if (OB_ISNULL(req)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("request is null", K(ret), KP(req));
} else {
req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
if (tmp_phy_queue->phy_queue_.get_total() <= 0) {
if (tmp_phy_queue->req_list_.is_empty()) {
tmp_phy_queue->reset_time_info();
} else if (OB_NOT_NULL(req->tenant_io_mgr_.get_ptr())) {
ObTenantIOClock *io_clock = static_cast<ObTenantIOClock *>(req->tenant_io_mgr_.get_ptr()->get_io_clock());
ObIORequest *next_req = nullptr;
if (OB_FAIL(tmp_phy_queue->phy_queue_.head_unsafe(next_req))) {
LOG_WARN("get next req failed", K(ret), KP(next_req));
} else if (OB_ISNULL(next_req)) {
ObIORequest *next_req = tmp_phy_queue->req_list_.get_first();
if (OB_ISNULL(next_req)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null next_req", KP(next_req));
} else if (OB_FAIL(io_clock->calc_phyqueue_clock(tmp_phy_queue, *next_req))) {
LOG_WARN("calc phyqueue clock failed", K(ret));
LOG_WARN("calc phyqueue clock failed", K(ret), KPC(next_req));
}
}
}
@ -1372,7 +1366,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
int64_t iter_count = 0;
ObPhyQueue *tmp_phy_queue = nullptr;
req = nullptr;
while (OB_SUCC(ret) && !cl_heap_.empty() && cl_heap_.top()->phy_queue_.get_total() > 0) {
while (OB_SUCC(ret) && !cl_heap_.empty() && !cl_heap_.top()->req_list_.is_empty()) {
tmp_phy_queue = cl_heap_.top();
deadline_ts = 0 == iter_count ? tmp_phy_queue->tenant_limitation_ts_ : deadline_ts;
++iter_count;
@ -1395,7 +1389,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
}
}
iter_count = 0;
while (OB_SUCC(ret) && !tl_heap_.empty() && tl_heap_.top()->phy_queue_.get_total() > 0) {
while (OB_SUCC(ret) && !tl_heap_.empty() && !tl_heap_.top()->req_list_.is_empty()) {
tmp_phy_queue = tl_heap_.top();
if (0 == iter_count) {
if (0 == deadline_ts) {
@ -1421,26 +1415,23 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
if (OB_ISNULL(tmp_phy_queue)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("phy_queue is null", K(ret));
} else if (tmp_phy_queue->phy_queue_.get_total() > 0) {
} else if (!tmp_phy_queue->req_list_.is_empty()) {
if (OB_FAIL(remove_from_heap(tmp_phy_queue))) {
LOG_WARN("remove phy queue from heap failed(P schedule)", K(ret));
} else {
if (OB_FAIL(tmp_phy_queue->phy_queue_.pop(req))) {
LOG_WARN("pop req from ready_heap failed");
} else if (OB_ISNULL(req)) {
req = tmp_phy_queue->req_list_.remove_first();
if (OB_ISNULL(req)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("req is null", K(ret), KP(req));
} else {
req->time_log_.dequeue_ts_ = ObTimeUtility::fast_current_time();
LOG_DEBUG("req pop from phy queue succcess(P schedule)", KP(req), K(iter_count), "time_cost", ObTimeUtility::fast_current_time() - current_ts, K(ready_heap_.count()), K(current_ts));
if (tmp_phy_queue->phy_queue_.get_total() <= 0) {
if (tmp_phy_queue->req_list_.is_empty()) {
tmp_phy_queue->reset_time_info();
} else if (OB_NOT_NULL(req->tenant_io_mgr_.get_ptr())) {
ObTenantIOClock *io_clock = static_cast<ObTenantIOClock *>(req->tenant_io_mgr_.get_ptr()->get_io_clock());
ObIORequest *next_req = nullptr;
if (OB_FAIL(tmp_phy_queue->phy_queue_.head_unsafe(next_req))) {
LOG_WARN("get next req failed", K(ret), K(next_req));
} else if (OB_ISNULL(next_req)) {
ObIORequest *next_req = tmp_phy_queue->req_list_.get_first();
if (OB_ISNULL(next_req)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get null next_req", KP(next_req));
} else {
@ -1462,7 +1453,7 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *&
}
} else {
ret = OB_EAGAIN;
if (!r_heap_.empty() && r_heap_.top()->phy_queue_.get_total() > 0) {
if (!r_heap_.empty() && !r_heap_.top()->req_list_.is_empty()) {
ObPhyQueue *next_tmp_phy_queue = r_heap_.top();
if (0 == deadline_ts) {
deadline_ts = next_tmp_phy_queue->reservation_ts_;

View File

@ -18,7 +18,7 @@
#include "lib/lock/ob_thread_cond.h"
#include "lib/container/ob_rbtree.h"
#include "common/storage/ob_io_device.h"
#include "lib/queue/ob_fixed_queue.h"
#include "lib/list/ob_list.h"
#include "lib/container/ob_heap.h"
namespace oceanbase
@ -207,7 +207,7 @@ class ObIOChannel;
class ObIOSender;
class ObIOUsage;
class ObIORequest
class ObIORequest : public common::ObDLinkBase<ObIORequest>
{
public:
ObIORequest();
@ -272,10 +272,9 @@ public:
void reset_time_info();
void reset_queue_info();
public:
typedef common::ObDList<ObIORequest> IOReqList;
TO_STRING_KV(K_(reservation_ts), K_(category_limitation_ts), K_(tenant_limitation_ts));
bool is_inited_;
ObFixedQueue<ObIORequest> phy_queue_;
static const int32_t SINGLE_QUEUE_DEPTH = 10000;
int64_t reservation_ts_;
int64_t category_limitation_ts_;
int64_t tenant_limitation_ts_;
@ -287,6 +286,7 @@ public:
int64_t category_limitation_pos_;
int64_t tenant_limitation_pos_;
int64_t proportion_pos_;
IOReqList req_list_;
};
class ObIOHandle final

View File

@ -634,12 +634,40 @@ void ObIOTuner::print_io_status()
}
}
/****************** ObPhyQueue **********************/
ObTenantPhyQueues::ObTenantPhyQueues()
/****************** ObIOCategoryQueues **********************/
ObIOCategoryQueues::ObIOCategoryQueues()
: is_inited_(false)
{
}
ObIOCategoryQueues::~ObIOCategoryQueues()
{
destroy();
}
int ObIOCategoryQueues::init()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("phy queue init twice", K(ret), K(is_inited_));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; ++i) {
if (OB_FAIL(phy_queues_[i].init(i))){
LOG_WARN("phy queue init failed", K(ret));
}
}
}
if (OB_SUCC(ret)) {
is_inited_ = true;
}
return ret;
}
void ObIOCategoryQueues::destroy()
{
is_inited_ = false;
}
/****************** IOScheduleQueue **********************/
ObIOSender::ObIOSender(ObIAllocator &allocator)
: is_inited_(false),
@ -693,11 +721,9 @@ struct DestroyPhyqueueMapFn
{
public:
DestroyPhyqueueMapFn(ObIAllocator &allocator) : allocator_(allocator) {}
int operator () (hash::HashMapPair<uint64_t, ObTenantPhyQueues *> &entry) {
int operator () (hash::HashMapPair<uint64_t, ObIOCategoryQueues *> &entry) {
if (nullptr != entry.second) {
for (int64_t i = 0; i < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; ++i) {
entry.second->phy_queues_[i].destroy();
}
entry.second->~ObIOCategoryQueues();
allocator_.free(entry.second);
}
return OB_SUCCESS;
@ -797,7 +823,7 @@ int ObIOSender::alloc_mclock_queue(ObIAllocator &allocator, ObMClockQueue *&io_q
int ObIOSender::enqueue_request(ObIORequest &req)
{
int ret = OB_SUCCESS;
ObTenantPhyQueues *tenant_phy_queues = nullptr;
ObIOCategoryQueues *io_category_queues = nullptr;
ObIORequest *tmp_req = &req;
if (!is_inited_) {
ret = OB_NOT_INIT;
@ -807,18 +833,19 @@ int ObIOSender::enqueue_request(ObIORequest &req)
if (OB_FAIL(cond_guard.get_ret())) {
LOG_ERROR("guard queue condition failed", K(ret));
} else {
if (OB_FAIL(tenant_map_.get_refactored(tmp_req->io_info_.tenant_id_, tenant_phy_queues))) {
if (OB_FAIL(tenant_map_.get_refactored(tmp_req->io_info_.tenant_id_, io_category_queues))) {
LOG_WARN("get_refactored tenant_map failed", K(ret), K(req));
} else {
const int index = static_cast<int>(tmp_req->get_category());
ObPhyQueue *tmp_phy_queue = &(tenant_phy_queues->phy_queues_[index]);
if (tmp_phy_queue->phy_queue_.get_total() <= 0) {
ObPhyQueue *tmp_phy_queue = &(io_category_queues->phy_queues_[index]);
if (tmp_phy_queue->req_list_.is_empty()) {
//new request
if (OB_FAIL(io_queue_->remove_from_heap(tmp_phy_queue))) {
LOG_WARN("remove phy queue from heap failed", K(ret), K(index));
} else {
req.inc_ref("phyqueue_inc"); //ref for phy_queue
if (OB_FAIL(tmp_phy_queue->phy_queue_.push(tmp_req))) {
req.inc_ref("phyqueue_inc"); //ref for phy_queue
if (OB_UNLIKELY(!tmp_phy_queue->req_list_.add_last(tmp_req))) {
ret = OB_ERR_UNEXPECTED;
req.dec_ref("phyqueue_dec"); //ref for phy_queue
tmp_phy_queue->reset_time_info();
LOG_WARN("push new req into phy queue failed", K(ret));
@ -846,7 +873,8 @@ int ObIOSender::enqueue_request(ObIORequest &req)
} else {
//not new req, into phy_queue and line up
req.inc_ref("phyqueue_inc"); //ref for phy_queue
if (OB_FAIL(tmp_phy_queue->phy_queue_.push(tmp_req))) {
if (OB_UNLIKELY(!tmp_phy_queue->req_list_.add_last(tmp_req))) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("req line up failed", K(req));
req.dec_ref("phyqueue_dec"); //ref for phy_queue
} else {
@ -865,7 +893,7 @@ int ObIOSender::enqueue_request(ObIORequest &req)
return ret;
}
int ObIOSender::enqueue_phy_queue(ObPhyQueue *phyqueue)
int ObIOSender::enqueue_phy_queue(ObPhyQueue &phyqueue)
{
int ret = OB_SUCCESS;
if (!is_inited_) {
@ -876,7 +904,7 @@ int ObIOSender::enqueue_phy_queue(ObPhyQueue *phyqueue)
if (OB_FAIL(cond_guard.get_ret())) {
LOG_ERROR("guard queue condition failed", K(ret));
} else {
if (OB_FAIL(io_queue_->push_phyqueue(phyqueue))) {
if (OB_FAIL(io_queue_->push_phyqueue(&phyqueue))) {
LOG_WARN("push phyqueue into queue failed", K(ret));
} else {
if (OB_FAIL(queue_cond_.signal())) {
@ -930,20 +958,19 @@ int ObIOSender::remove_phy_queue(const uint64_t tenant_id)
if (OB_FAIL(cond_guard.get_ret())) {
LOG_ERROR("guard queue condition failed", K(ret));
} else {
ObTenantPhyQueues *tenant_phy_queues = nullptr;
if (OB_FAIL(tenant_map_.erase_refactored(tenant_id, &tenant_phy_queues))) {
ObIOCategoryQueues *io_category_queues = nullptr;
if (OB_FAIL(tenant_map_.erase_refactored(tenant_id, &io_category_queues))) {
LOG_WARN("erase phy_queues failed", K(ret), K(tenant_id));
} else if (nullptr != tenant_phy_queues) {
} else if (nullptr != io_category_queues) {
for (int64_t j = 0; OB_SUCC(ret) && j < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; ++j) {
ObPhyQueue *tmp_phy_queue = &(tenant_phy_queues->phy_queues_[j]);
ObPhyQueue *tmp_phy_queue = &(io_category_queues->phy_queues_[j]);
if (OB_FAIL(io_queue_->remove_from_heap(tmp_phy_queue))) {
LOG_WARN("remove phy queue from heap failed", K(ret));
} else {
tmp_phy_queue->~ObPhyQueue();
}
}
if (OB_SUCC(ret)) {
allocator_.free(tenant_phy_queues);
io_category_queues->~ObIOCategoryQueues();
allocator_.free(io_category_queues);
}
}
}
@ -1202,29 +1229,29 @@ int ObIOScheduler::add_tenant_map(uint64_t tenant_id)
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < senders_.count(); ++i) {
ObIOSender *cur_sender = senders_.at(i);
ObTenantPhyQueues *tenant_phy_queues = nullptr;
ObIOCategoryQueues *io_category_queues = nullptr;
void *buf_queues = nullptr;
if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObTenantPhyQueues)))) {
if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObIOCategoryQueues)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate tenant phyqueues memory failed", K(ret));
LOG_WARN("allocate phyqueues memory failed", K(ret));
} else {
tenant_phy_queues = new (buf_queues) ObTenantPhyQueues;
for(int64_t j = 0; OB_SUCC(ret) && j < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; j++) {
if (OB_FAIL(tenant_phy_queues->phy_queues_[j].init(j))){
LOG_WARN("phy queue init failed", K(ret));
} else if (OB_FAIL(cur_sender->enqueue_phy_queue(&(tenant_phy_queues->phy_queues_[j])))) {
LOG_WARN("new phy_queue into send_queue failed", K(ret));
io_category_queues = new (buf_queues) ObIOCategoryQueues();
if (OB_FAIL(io_category_queues->init())) {
LOG_WARN("init phyqueues failed", K(ret));
} else {
for(int64_t j = 0; OB_SUCC(ret) && j < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; j++) {
if (OB_FAIL(cur_sender->enqueue_phy_queue(io_category_queues->phy_queues_[j]))) {
LOG_WARN("new phy_queue into send_queue failed", K(ret));
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(cur_sender->tenant_map_.set_refactored(tenant_id, tenant_phy_queues))) {
if (OB_FAIL(cur_sender->tenant_map_.set_refactored(tenant_id, io_category_queues))) {
LOG_WARN("init tenant map failed", K(ret), K(i));
}
} else {
for(int64_t j = 0; OB_SUCC(ret) && j < static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1; j++) {
tenant_phy_queues->phy_queues_[j].~ObPhyQueue();
}
cur_sender->allocator_.free(tenant_phy_queues);
io_category_queues->~ObIOCategoryQueues();
cur_sender->allocator_.free(io_category_queues);
}
}
}

View File

@ -187,11 +187,14 @@ private:
ObIOScheduler &io_scheduler_;
};
struct ObTenantPhyQueues final {
struct ObIOCategoryQueues final {
public:
ObTenantPhyQueues();
~ObTenantPhyQueues();
public:
ObIOCategoryQueues();
~ObIOCategoryQueues();
int init();
void destroy();
public:
bool is_inited_;
ObPhyQueue phy_queues_[static_cast<int>(ObIOCategory::MAX_CATEGORY) + 1];
};
@ -212,7 +215,7 @@ public:
int alloc_mclock_queue(ObIAllocator &allocator, ObMClockQueue *&io_queue);
int enqueue_request(ObIORequest &req);
int enqueue_phy_queue(ObPhyQueue *phyqueue);
int enqueue_phy_queue(ObPhyQueue &phyqueue);
int dequeue_request(ObIORequest *&req);
int remove_phy_queue(const uint64_t tenant_id);
int notify();
@ -229,7 +232,7 @@ public:
int tg_id_; // thread group id
ObMClockQueue *io_queue_;
ObThreadCond queue_cond_;
hash::ObHashMap<uint64_t, ObTenantPhyQueues *> tenant_map_;
hash::ObHashMap<uint64_t, ObIOCategoryQueues *> tenant_map_;
int64_t sender_req_count_;
};

View File

@ -560,7 +560,8 @@ TEST_F(TestIOStruct, IOCallbackManager)
ASSERT_FAIL(callback_mgr.enqueue_callback(req));
char buf[32] = "test";
req.io_buf_ = buf;
req.copied_callback_ = new (req.callback_buf_) TestIOCallback();
char callback_buf_[ObIOCallback::CALLBACK_BUF_SIZE] __attribute__ ((aligned (16)));
req.copied_callback_ = new (callback_buf_) TestIOCallback();
// ObIOManager::get_instance().io_config_ = ObIOConfig::default_config();
ASSERT_SUCC(callback_mgr.enqueue_callback(req));