[CP]Split the memory of the physical queue into tenants

This commit is contained in:
renju96 2023-06-21 22:42:45 +00:00 committed by ob-robot
parent d2198fb65f
commit 7dac9a0f5f
4 changed files with 37 additions and 20 deletions

View File

@ -478,8 +478,6 @@ int ObIOManager::add_tenant_io_manager(const uint64_t tenant_id, const ObTenantI
} else if (FALSE_IT(tenant_io_mgr = new (buf) ObTenantIOManager())) {
} else if (OB_FAIL(tenant_io_mgr->init(tenant_id, tenant_io_config, &io_scheduler_))) {
LOG_WARN("init tenant io manager failed", K(ret), K(tenant_id), K(tenant_io_config));
} else if (OB_FAIL(io_scheduler_.init_group_queues(tenant_id, tenant_io_mgr->get_group_num()))) {
LOG_WARN("init io map failed", K(ret), K(tenant_id), K(tenant_io_config));
} else if (OB_FAIL(tenant_io_mgr->start())) {
LOG_WARN("start tenant io manager failed", K(ret), K(tenant_id));
} else {
@ -665,6 +663,8 @@ int ObTenantIOManager::init(const uint64_t tenant_id,
LOG_WARN("init io usage failed", K(ret), K(io_usage_));
} else if (OB_FAIL(io_clock_->init(io_config, &io_usage_))) {
LOG_WARN("init io clock failed", K(ret), K(io_config));
} else if (OB_FAIL(io_scheduler->init_group_queues(tenant_id, io_config.group_num_, &io_allocator_))) {
LOG_WARN("init io map failed", K(ret), K(tenant_id), K(io_allocator_));
} else if (OB_FAIL(init_group_index_map(tenant_id, io_config))) {
LOG_WARN("init group map failed", K(ret));
} else if (OB_FAIL(io_config_.deep_copy(io_config))) {

View File

@ -151,6 +151,7 @@ public:
int trace_request_if_need(const ObIORequest *req, const char* msg, ObIOTracer::TraceType trace_type);
int64_t get_group_num();
uint64_t get_usage_index(const int64_t group_id);
ObIOAllocator *get_tenant_io_allocator() { return &io_allocator_; }
void print_io_status();
void inc_ref();
void dec_ref();

View File

@ -873,16 +873,24 @@ int ObIOSender::init(const int64_t sender_index)
struct DestroyGroupqueueMapFn
{
public:
DestroyGroupqueueMapFn(ObIAllocator &allocator) : allocator_(allocator) {}
DestroyGroupqueueMapFn() {}
int operator () (hash::HashMapPair<uint64_t, ObIOGroupQueues *> &entry) {
int ret = OB_SUCCESS;
if (nullptr != entry.second) {
entry.second->~ObIOGroupQueues();
allocator_.free(entry.second);
if (is_valid_tenant_id(entry.first)) {
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(entry.first, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(entry.first));
} else {
entry.second->~ObIOGroupQueues();
if (OB_NOT_NULL(tenant_holder.get_ptr()->get_tenant_io_allocator())) {
tenant_holder.get_ptr()->get_tenant_io_allocator()->free(entry.second);
}
}
}
}
return OB_SUCCESS;
return ret;
}
private:
ObIAllocator &allocator_;
};
void ObIOSender::stop()
@ -910,7 +918,7 @@ void ObIOSender::destroy()
TG_DESTROY(tg_id_);
tg_id_ = -1;
}
DestroyGroupqueueMapFn destry_groupqueue_map_fn(allocator_);
DestroyGroupqueueMapFn destry_groupqueue_map_fn;
tenant_groups_map_.foreach_refactored(destry_groupqueue_map_fn);
tenant_groups_map_.destroy();
queue_cond_.destroy();
@ -1138,6 +1146,9 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group
ObIOGroupQueues *io_group_queues = nullptr;
if (OB_FAIL(tenant_groups_map_.get_refactored(tenant_id, io_group_queues))) {
LOG_WARN("get_refactored form tenant_group_map failed", K(ret), K(tenant_id));
} else if (OB_ISNULL(io_group_queues)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("io group queues is null", K(ret), KP(io_group_queues));
} else if (OB_UNLIKELY(!io_group_queues->is_inited_)) {
LOG_WARN("io_group_queues not init", K(ret), K(*io_group_queues));
} else if (io_group_queues->group_phy_queues_.count() > group_num || group_num < 0) {
@ -1151,7 +1162,7 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group
for (int64_t i = cur_num; OB_SUCC(ret) && i < group_num; ++i) {
void *buf = nullptr;
ObPhyQueue *tmp_phyqueue = nullptr;
if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObPhyQueue)))) {
if (OB_ISNULL(buf = io_group_queues->allocator_.alloc(sizeof(ObPhyQueue)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate memory failed", K(ret));
} else if (FALSE_IT(tmp_phyqueue = new (buf) ObPhyQueue())) {
@ -1166,7 +1177,7 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group
}
if (OB_FAIL(ret) && nullptr != tmp_phyqueue) {
tmp_phyqueue->~ObPhyQueue();
allocator_.free(tmp_phyqueue);
io_group_queues->allocator_.free(tmp_phyqueue);
}
}
}
@ -1202,8 +1213,13 @@ int ObIOSender::remove_group_queues(const uint64_t tenant_id)
if(OB_FAIL(io_queue_->remove_from_heap(&(io_group_queues->other_phy_queue_)))) {
LOG_WARN("remove other phy queue from heap failed", K(ret));
} else {
io_group_queues->~ObIOGroupQueues();
allocator_.free(io_group_queues);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id));
} else {
io_group_queues->~ObIOGroupQueues();
tenant_holder.get_ptr()->get_tenant_io_allocator()->free(io_group_queues);
}
}
}
}
@ -1549,24 +1565,24 @@ int ObIOScheduler::schedule_request(ObIORequest &req)
return ret;
}
int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t group_num)
int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t group_num, ObIOAllocator *io_allocator)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || group_num < 0)) {
} else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || group_num < 0 || OB_ISNULL(io_allocator))) {
ret = OB_INVALID_CONFIG;
LOG_WARN("invalid config", K(ret), K(tenant_id), K(group_num));
LOG_WARN("invalid config", K(ret), K(tenant_id), K(group_num), KP(io_allocator));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < senders_.count(); ++i) {
ObIOSender *cur_sender = senders_.at(i);
ObIOGroupQueues *io_group_queues = nullptr;
void *buf_queues = nullptr;
if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObIOGroupQueues)))) {
if (OB_ISNULL(buf_queues = io_allocator->alloc(sizeof(ObIOGroupQueues)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("allocate phyqueues memory failed", K(ret));
} else if (FALSE_IT(io_group_queues = new (buf_queues) ObIOGroupQueues(cur_sender->allocator_))) {
} else if (FALSE_IT(io_group_queues = new (buf_queues) ObIOGroupQueues(*io_allocator))) {
} else if (OB_FAIL(io_group_queues->other_phy_queue_.init(INT64_MAX))) { //other group index
LOG_WARN("init other group queue failes", K(ret));
} else if (OB_FAIL(io_group_queues->init(group_num))) {
@ -1585,7 +1601,7 @@ int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t gro
}
} else {
io_group_queues->~ObIOGroupQueues();
cur_sender->allocator_.free(io_group_queues);
io_allocator->free(io_group_queues);
}
}
}

View File

@ -279,7 +279,7 @@ public:
void stop();
void accumulate(const ObIORequest &req);
int schedule_request(ObIORequest &req);
int init_group_queues(const uint64_t tenant_id, const int64_t group_num);
int init_group_queues(const uint64_t tenant_id, const int64_t group_num, ObIOAllocator *io_allocator);
int update_group_queues(const uint64_t tenant_id, const int64_t group_num);
int remove_phyqueues(const uint64_t tenant_id);
int stop_phy_queues(const uint64_t tenant_id, const int64_t index);