diff --git a/src/share/io/ob_io_manager.cpp b/src/share/io/ob_io_manager.cpp index 585bbf050..962d824f0 100644 --- a/src/share/io/ob_io_manager.cpp +++ b/src/share/io/ob_io_manager.cpp @@ -478,8 +478,6 @@ int ObIOManager::add_tenant_io_manager(const uint64_t tenant_id, const ObTenantI } else if (FALSE_IT(tenant_io_mgr = new (buf) ObTenantIOManager())) { } else if (OB_FAIL(tenant_io_mgr->init(tenant_id, tenant_io_config, &io_scheduler_))) { LOG_WARN("init tenant io manager failed", K(ret), K(tenant_id), K(tenant_io_config)); - } else if (OB_FAIL(io_scheduler_.init_group_queues(tenant_id, tenant_io_mgr->get_group_num()))) { - LOG_WARN("init io map failed", K(ret), K(tenant_id), K(tenant_io_config)); } else if (OB_FAIL(tenant_io_mgr->start())) { LOG_WARN("start tenant io manager failed", K(ret), K(tenant_id)); } else { @@ -665,6 +663,8 @@ int ObTenantIOManager::init(const uint64_t tenant_id, LOG_WARN("init io usage failed", K(ret), K(io_usage_)); } else if (OB_FAIL(io_clock_->init(io_config, &io_usage_))) { LOG_WARN("init io clock failed", K(ret), K(io_config)); + } else if (OB_FAIL(io_scheduler->init_group_queues(tenant_id, io_config.group_num_, &io_allocator_))) { + LOG_WARN("init io map failed", K(ret), K(tenant_id), K(io_allocator_)); } else if (OB_FAIL(init_group_index_map(tenant_id, io_config))) { LOG_WARN("init group map failed", K(ret)); } else if (OB_FAIL(io_config_.deep_copy(io_config))) { diff --git a/src/share/io/ob_io_manager.h b/src/share/io/ob_io_manager.h index 5d0a5c145..29c453bb2 100644 --- a/src/share/io/ob_io_manager.h +++ b/src/share/io/ob_io_manager.h @@ -151,6 +151,7 @@ public: int trace_request_if_need(const ObIORequest *req, const char* msg, ObIOTracer::TraceType trace_type); int64_t get_group_num(); uint64_t get_usage_index(const int64_t group_id); + ObIOAllocator *get_tenant_io_allocator() { return &io_allocator_; } void print_io_status(); void inc_ref(); void dec_ref(); diff --git a/src/share/io/ob_io_struct.cpp b/src/share/io/ob_io_struct.cpp index 55992568f..999b87ff9 100644 --- a/src/share/io/ob_io_struct.cpp +++ b/src/share/io/ob_io_struct.cpp @@ -873,16 +873,24 @@ int ObIOSender::init(const int64_t sender_index) struct DestroyGroupqueueMapFn { public: - DestroyGroupqueueMapFn(ObIAllocator &allocator) : allocator_(allocator) {} + DestroyGroupqueueMapFn() {} int operator () (hash::HashMapPair &entry) { + int ret = OB_SUCCESS; if (nullptr != entry.second) { - entry.second->~ObIOGroupQueues(); - allocator_.free(entry.second); + if (is_valid_tenant_id(entry.first)) { + ObRefHolder tenant_holder; + if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(entry.first, tenant_holder))) { + LOG_WARN("get tenant io manager failed", K(ret), K(entry.first)); + } else { + entry.second->~ObIOGroupQueues(); + if (OB_NOT_NULL(tenant_holder.get_ptr()->get_tenant_io_allocator())) { + tenant_holder.get_ptr()->get_tenant_io_allocator()->free(entry.second); + } + } + } } - return OB_SUCCESS; + return ret; } -private: - ObIAllocator &allocator_; }; void ObIOSender::stop() @@ -910,7 +918,7 @@ void ObIOSender::destroy() TG_DESTROY(tg_id_); tg_id_ = -1; } - DestroyGroupqueueMapFn destry_groupqueue_map_fn(allocator_); + DestroyGroupqueueMapFn destry_groupqueue_map_fn; tenant_groups_map_.foreach_refactored(destry_groupqueue_map_fn); tenant_groups_map_.destroy(); queue_cond_.destroy(); @@ -1138,6 +1146,9 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group ObIOGroupQueues *io_group_queues = nullptr; if (OB_FAIL(tenant_groups_map_.get_refactored(tenant_id, io_group_queues))) { LOG_WARN("get_refactored form tenant_group_map failed", K(ret), K(tenant_id)); + } else if (OB_ISNULL(io_group_queues)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("io group queues is null", K(ret), KP(io_group_queues)); } else if (OB_UNLIKELY(!io_group_queues->is_inited_)) { LOG_WARN("io_group_queues not init", K(ret), K(*io_group_queues)); } else if (io_group_queues->group_phy_queues_.count() > group_num || group_num < 0) { @@ -1151,7 +1162,7 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group for (int64_t i = cur_num; OB_SUCC(ret) && i < group_num; ++i) { void *buf = nullptr; ObPhyQueue *tmp_phyqueue = nullptr; - if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObPhyQueue)))) { + if (OB_ISNULL(buf = io_group_queues->allocator_.alloc(sizeof(ObPhyQueue)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate memory failed", K(ret)); } else if (FALSE_IT(tmp_phyqueue = new (buf) ObPhyQueue())) { @@ -1166,7 +1177,7 @@ int ObIOSender::update_group_queue(const uint64_t tenant_id, const int64_t group } if (OB_FAIL(ret) && nullptr != tmp_phyqueue) { tmp_phyqueue->~ObPhyQueue(); - allocator_.free(tmp_phyqueue); + io_group_queues->allocator_.free(tmp_phyqueue); } } } @@ -1202,8 +1213,13 @@ int ObIOSender::remove_group_queues(const uint64_t tenant_id) if(OB_FAIL(io_queue_->remove_from_heap(&(io_group_queues->other_phy_queue_)))) { LOG_WARN("remove other phy queue from heap failed", K(ret)); } else { - io_group_queues->~ObIOGroupQueues(); - allocator_.free(io_group_queues); + ObRefHolder tenant_holder; + if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, tenant_holder))) { + LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id)); + } else { + io_group_queues->~ObIOGroupQueues(); + tenant_holder.get_ptr()->get_tenant_io_allocator()->free(io_group_queues); + } } } } @@ -1549,24 +1565,24 @@ int ObIOScheduler::schedule_request(ObIORequest &req) return ret; } -int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t group_num) +int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t group_num, ObIOAllocator *io_allocator) { int ret = OB_SUCCESS; if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret), K(is_inited_)); - } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || group_num < 0)) { + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || group_num < 0 || OB_ISNULL(io_allocator))) { ret = OB_INVALID_CONFIG; - LOG_WARN("invalid config", K(ret), K(tenant_id), K(group_num)); + LOG_WARN("invalid config", K(ret), K(tenant_id), K(group_num), KP(io_allocator)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < senders_.count(); ++i) { ObIOSender *cur_sender = senders_.at(i); ObIOGroupQueues *io_group_queues = nullptr; void *buf_queues = nullptr; - if (OB_ISNULL(buf_queues = cur_sender->allocator_.alloc(sizeof(ObIOGroupQueues)))) { + if (OB_ISNULL(buf_queues = io_allocator->alloc(sizeof(ObIOGroupQueues)))) { ret = OB_ALLOCATE_MEMORY_FAILED; LOG_WARN("allocate phyqueues memory failed", K(ret)); - } else if (FALSE_IT(io_group_queues = new (buf_queues) ObIOGroupQueues(cur_sender->allocator_))) { + } else if (FALSE_IT(io_group_queues = new (buf_queues) ObIOGroupQueues(*io_allocator))) { } else if (OB_FAIL(io_group_queues->other_phy_queue_.init(INT64_MAX))) { //other group index LOG_WARN("init other group queue failes", K(ret)); } else if (OB_FAIL(io_group_queues->init(group_num))) { @@ -1585,7 +1601,7 @@ int ObIOScheduler::init_group_queues(const uint64_t tenant_id, const int64_t gro } } else { io_group_queues->~ObIOGroupQueues(); - cur_sender->allocator_.free(io_group_queues); + io_allocator->free(io_group_queues); } } } diff --git a/src/share/io/ob_io_struct.h b/src/share/io/ob_io_struct.h index 16acf6710..5bd2ae625 100644 --- a/src/share/io/ob_io_struct.h +++ b/src/share/io/ob_io_struct.h @@ -279,7 +279,7 @@ public: void stop(); void accumulate(const ObIORequest &req); int schedule_request(ObIORequest &req); - int init_group_queues(const uint64_t tenant_id, const int64_t group_num); + int init_group_queues(const uint64_t tenant_id, const int64_t group_num, ObIOAllocator *io_allocator); int update_group_queues(const uint64_t tenant_id, const int64_t group_num); int remove_phyqueues(const uint64_t tenant_id); int stop_phy_queues(const uint64_t tenant_id, const int64_t index);