diff --git a/src/share/io/io_schedule/ob_io_mclock.cpp b/src/share/io/io_schedule/ob_io_mclock.cpp index a89a620ea4..99c9e391d8 100644 --- a/src/share/io/io_schedule/ob_io_mclock.cpp +++ b/src/share/io/io_schedule/ob_io_mclock.cpp @@ -24,6 +24,7 @@ using namespace oceanbase::common; ObMClock::ObMClock() : is_inited_(false), is_stopped_(false), + is_unlimited_(false), reservation_clock_(), limitation_clock_(), proportion_clock_() @@ -52,6 +53,7 @@ int ObMClock::init(const int64_t min_iops, const int64_t max_iops, const int64_t limitation_clock_.last_ns_ = 0; proportion_clock_.iops_ = weight; proportion_clock_.last_ns_ = proportion_ts * 1000L; + is_unlimited_ = false; is_stopped_ = false; is_inited_ = true; } @@ -84,12 +86,14 @@ void ObMClock::start() void ObMClock::stop() { is_stopped_ = true; + is_unlimited_ = true; } void ObMClock::destroy() { is_inited_ = false; is_stopped_ = false; + is_unlimited_ = false; reservation_clock_.reset(); limitation_clock_.reset(); proportion_clock_.reset(); @@ -110,6 +114,11 @@ bool ObMClock::is_stop() const return is_stopped_; } +bool ObMClock::is_unlimited() const +{ + return is_unlimited_; +} + int ObMClock::calc_phy_clock(const int64_t current_ts, const double iops_scale, const double weight_scale, ObPhyQueue *phy_queue) { int ret = OB_SUCCESS; @@ -121,6 +130,11 @@ int ObMClock::calc_phy_clock(const int64_t current_ts, const double iops_scale, || weight_scale <= std::numeric_limits::epsilon())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(current_ts), K(iops_scale), K(weight_scale)); + } else if (is_unlimited_) { + phy_queue->reservation_ts_ = current_ts; + phy_queue->group_limitation_ts_ = current_ts; + phy_queue->tenant_limitation_ts_ = current_ts; + phy_queue->proportion_ts_ = current_ts; } else { reservation_clock_.atom_update(current_ts, iops_scale, phy_queue->reservation_ts_); limitation_clock_.atom_update(current_ts, iops_scale, phy_queue->group_limitation_ts_); @@ -489,6 +503,14 @@ int ObTenantIOClock::update_clock_unit_config(const ObTenantIOConfig &io_config) return ret; } +bool ObTenantIOClock::is_unlimited_config(const ObMClock &clock, const ObTenantIOConfig::GroupConfig &cur_config) +{ + return clock.is_stop() || + cur_config.deleted_ || + cur_config.cleared_ || + (cur_config.min_percent_ == INT64_MAX && cur_config.max_percent_ == INT64_MAX); +} + int ObTenantIOClock::update_io_clock(const int64_t index, const ObTenantIOConfig &io_config, const int64_t all_group_num) { int ret = OB_SUCCESS; @@ -518,8 +540,9 @@ int ObTenantIOClock::update_io_clock(const int64_t index, const ObTenantIOConfig const ObTenantIOConfig::GroupConfig &cur_config = io_config.group_configs_.at(index); if (!group_clocks_.at(index).is_inited()) { LOG_WARN("clock is not init", K(ret), K(index), K(group_clocks_.at(index))); - } else if (group_clocks_.at(index).is_stop() || cur_config.deleted_ || cur_config.cleared_) { - // group has been deleted, ignore + } else if (is_unlimited_config(group_clocks_.at(index), cur_config)) { + group_clocks_.at(index).set_unlimited(); + LOG_INFO("clock set unlimited", K(group_clocks_.at(index)), K(cur_config)); } else if (!cur_config.is_valid()) { LOG_WARN("config is not valid", K(ret), K(index), K(cur_config), K(group_clocks_.at(index))); // stop @@ -531,6 +554,9 @@ int ObTenantIOClock::update_io_clock(const int64_t index, const ObTenantIOConfig LOG_WARN("update group io clock failed", K(ret), K(index), K(unit_config), K(cur_config)); } else { group_clocks_.at(index).start(); + if (!is_unlimited_config(group_clocks_.at(index), cur_config)) { + group_clocks_.at(index).set_limited(); + } LOG_INFO("update group clock success", K(index), K(unit_config), K(cur_config)); } } else { diff --git a/src/share/io/io_schedule/ob_io_mclock.h b/src/share/io/io_schedule/ob_io_mclock.h index 8bf0e141d7..209ce4e498 100644 --- a/src/share/io/io_schedule/ob_io_mclock.h +++ b/src/share/io/io_schedule/ob_io_mclock.h @@ -33,18 +33,22 @@ public: void start(); void stop(); void destroy(); + void set_unlimited() { is_unlimited_ = true; } + void set_limited() { is_unlimited_ = false; } bool is_inited() const; bool is_valid() const; bool is_stop() const; + bool is_unlimited() const; int calc_phy_clock(const int64_t current_ts, const double iops_scale, const double weight_scale, ObPhyQueue *phy_queue); int dial_back_reservation_clock(const double iops_scale); int time_out_dial_back(const double iops_scale, const double weight_scale); int dial_back_proportion_clock(const int64_t delta_us); int64_t get_proportion_ts() const; - TO_STRING_KV(K(is_inited_), K(is_stopped_), K_(reservation_clock), K_(limitation_clock), K_(proportion_clock)); + TO_STRING_KV(K(is_inited_), K(is_stopped_), K_(reservation_clock), K_(is_unlimited), K_(limitation_clock), K_(proportion_clock)); private: bool is_inited_; bool is_stopped_; + bool is_unlimited_; // use this flag to send io_req in useless_queue out ASAP ObAtomIOClock reservation_clock_; ObAtomIOClock limitation_clock_; ObAtomIOClock proportion_clock_; @@ -68,6 +72,7 @@ public: int update_io_clock(const int64_t index, const ObTenantIOConfig &io_config, const int64_t all_group_num); int update_clock_unit_config(const ObTenantIOConfig &io_config); int64_t get_min_proportion_ts(); + bool is_unlimited_config(const ObMClock &clock, const ObTenantIOConfig::GroupConfig &cur_config); void stop_clock(const uint64_t index); TO_STRING_KV(K(is_inited_), "group_clocks", group_clocks_, "other_clock", other_group_clock_, K_(unit_clock), K(io_config_), K(io_usage_)); diff --git a/src/share/io/ob_io_calibration.cpp b/src/share/io/ob_io_calibration.cpp index dc71cbc1ac..77b11571fc 100644 --- a/src/share/io/ob_io_calibration.cpp +++ b/src/share/io/ob_io_calibration.cpp @@ -702,10 +702,6 @@ int ObIOCalibration::read_from_table() if (OB_FAIL(update_io_ability(tmp_ability))) { LOG_WARN("update io ability failed", K(ret), K(tmp_ability)); } - } else { - if (OB_FAIL(reset_io_ability())) { - LOG_WARN("reset io ability failed", K(ret)); - } } return ret; } diff --git a/src/share/io/ob_io_define.cpp b/src/share/io/ob_io_define.cpp index 379c408b60..af92e6a1b3 100644 --- a/src/share/io/ob_io_define.cpp +++ b/src/share/io/ob_io_define.cpp @@ -1770,7 +1770,15 @@ int64_t ObTenantIOConfig::to_string(char* buf, const int64_t buf_len) const bool need_comma = false; for (int64_t i = 0; i < group_configs_.count(); ++i) { if (group_configs_.at(i).deleted_ || group_configs_.at(i).cleared_) { - continue; + if (need_comma) { + J_COMMA(); + } + BUF_PRINTF("group_id = "); + char group_id[8]; + snprintf(group_id, sizeof(group_id), "%ld", group_ids_.at(i)); + J_KV(group_id, group_configs_.at(i).deleted_); + J_KV(" cleared", group_configs_.at(i).cleared_); + need_comma = true; } else if (!self_valid || group_configs_.at(i).is_valid()) { if (need_comma) { J_COMMA(); @@ -2087,6 +2095,16 @@ int ObMClockQueue::pop_with_ready_queue(const int64_t current_ts, ObIORequest *& ret = OB_EAGAIN; if (!r_heap_.empty() && !r_heap_.top()->req_list_.is_empty()) { ObPhyQueue *next_tmp_phy_queue = r_heap_.top(); + if (OB_UNLIKELY(next_tmp_phy_queue->reservation_ts_ == INT64_MAX && + (next_tmp_phy_queue->group_limitation_ts_ == INT64_MAX || next_tmp_phy_queue->tenant_limitation_ts_== INT64_MAX) && + next_tmp_phy_queue->proportion_ts_== INT64_MAX)) { + // 对应min = max = 0的极端场景 + const int64_t current_ts = ObTimeUtility::fast_current_time(); + next_tmp_phy_queue->reservation_ts_ = current_ts; + next_tmp_phy_queue->group_limitation_ts_ = current_ts; + next_tmp_phy_queue->tenant_limitation_ts_ = current_ts; + next_tmp_phy_queue->proportion_ts_ = current_ts; + } if (0 == deadline_ts) { deadline_ts = next_tmp_phy_queue->reservation_ts_; } else { diff --git a/src/share/io/ob_io_manager.cpp b/src/share/io/ob_io_manager.cpp index 5bc210c1f5..cd72d541cc 100644 --- a/src/share/io/ob_io_manager.cpp +++ b/src/share/io/ob_io_manager.cpp @@ -563,6 +563,28 @@ int ObIOManager::refresh_tenant_io_config(const uint64_t tenant_id, const ObTena return ret; } +// for unittest +int ObIOManager::modify_group_io_config(const uint64_t tenant_id, + const uint64_t index, + const int64_t min_percent, + const int64_t max_percent, + const int64_t weight_percent) +{ + int ret = OB_SUCCESS; + ObRefHolder tenant_holder; + if (OB_UNLIKELY(!is_inited_)) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret), K(is_inited_)); + } else if (OB_FAIL(get_tenant_io_manager(tenant_id, tenant_holder))) { + LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id)); + } else if (OB_FAIL(tenant_holder.get_ptr()->modify_group_io_config(index, min_percent, max_percent, weight_percent, false, false))) { + LOG_WARN("update tenant io config failed", K(ret), K(tenant_id), K(min_percent), K(max_percent), K(weight_percent)); + } else if (OB_FAIL(tenant_holder.get_ptr()->refresh_group_io_config())) { + LOG_WARN("fail to refresh group config", K(ret)); + } + return ret; +} + int ObIOManager::get_tenant_io_manager(const uint64_t tenant_id, ObRefHolder &tenant_holder) { int ret = OB_SUCCESS; @@ -1368,7 +1390,7 @@ int ObTenantIOManager::reset_all_group_config() for (int64_t i = 0; i < io_config_.group_num_; ++i) { if(io_config_.group_configs_.at(i).deleted_) { //do nothing - } else if (OB_FAIL(modify_group_io_config(i, 0, 0, 0, false, true/*cleared*/))) { + } else if (OB_FAIL(modify_group_io_config(i, 0, 100, 0, false, true/*cleared*/))) { LOG_WARN("modify group io config failed", K(ret), K(i)); } } @@ -1409,7 +1431,7 @@ int ObTenantIOManager::reset_consumer_group_config(const int64_t group_id) } else if (OB_UNLIKELY(index == INT64_MAX || index < 0)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid index, maybe try to reset OTHER_GROUPS or deleted_groups", K(ret), K(index), K(group_id)); - } else if (OB_FAIL(modify_group_io_config(index, 0, 0, 0, false, true/*cleared*/))) { + } else if (OB_FAIL(modify_group_io_config(index, 0, 100, 0, false, true/*cleared*/))) { LOG_WARN("modify group io config failed", K(ret), K(tenant_id_), K(index)); } else { LOG_INFO ("stop group io control success when delete directive", K(tenant_id_), K(group_id), K(index), K(io_config_)); @@ -1458,7 +1480,7 @@ int ObTenantIOManager::delete_consumer_group_config(const int64_t group_id) } else { if (OB_FAIL(group_id_index_map_.set_refactored(group_id, INT64_MAX, 1))) { //使用非法值覆盖 LOG_WARN("stop phy queues failed", K(ret), K(tenant_id_), K(index)); - } else if (OB_FAIL(modify_group_io_config(index, 0, 0, 0, true/*deleted*/, false))) { + } else if (OB_FAIL(modify_group_io_config(index, 0, 100, 0, true/*deleted*/, false))) { LOG_WARN("modify group io config failed", K(ret), K(tenant_id_), K(index)); } } diff --git a/src/share/io/ob_io_manager.h b/src/share/io/ob_io_manager.h index 93bf0c18fc..bbdf2e8e7d 100644 --- a/src/share/io/ob_io_manager.h +++ b/src/share/io/ob_io_manager.h @@ -73,6 +73,11 @@ public: int refresh_tenant_io_config(const uint64_t tenant_id, const ObTenantIOConfig &tenant_io_config); int get_tenant_io_manager(const uint64_t tenant_id, ObRefHolder &tenant_holder); int get_tenant_ids(ObIArray &tenant_ids); + int modify_group_io_config(const uint64_t tenant_id, + const uint64_t index, + const int64_t min_percent, + const int64_t max_percent, + const int64_t weight_percent); ObIOScheduler *get_scheduler(); private: diff --git a/unittest/storage/test_io_manager.cpp b/unittest/storage/test_io_manager.cpp index 7ae2a82a4d..af203d8153 100644 --- a/unittest/storage/test_io_manager.cpp +++ b/unittest/storage/test_io_manager.cpp @@ -1181,6 +1181,25 @@ public: ObIOFd fd_; }; +class IOGroupModify : public ThreadPool +{ +public: + IOGroupModify() + : modify_init_ts_(0) + {} + int init(int64_t modify_init_ts, int64_t modify_delay_ts, const IOPerfTenant &curr_tenant); + void destroy(); + virtual void run1() override; + TO_STRING_KV(K(load_), K(modify_delay_ts_), K(fd_), K(curr_tenant_)); +public: + int64_t modify_init_ts_; + int64_t modify_delay_ts_; + IOPerfTenant curr_tenant_; + ObConcurrentFIFOAllocator allocator_; + IOPerfLoad load_; + ObIOFd fd_; +}; + class IOTracerSwitch : public ThreadPool { public: @@ -1695,6 +1714,97 @@ TEST_F(TestIOManager, ModifyCallbackThread) LOG_INFO("modify callback thread finished"); } +TEST_F(TestIOManager, ModifyGroupIO) +{ + // use multi thread to do modify group_io_config + bool is_perf_config_exist = false; + ASSERT_SUCC(FileDirectoryUtils::is_exists(GROUP_PERF_CONFIG_FILE, is_perf_config_exist)); + if (!is_perf_config_exist) { + write_group_perf_config(); + } + // parse configs + IOPerfScheduler scheduler_config; + ObArray perf_devices; + ObArray perf_tenants; + ObArray perf_loads; + ASSERT_SUCC(parse_group_perf_config(GROUP_PERF_CONFIG_FILE, scheduler_config, perf_devices, perf_tenants, perf_loads)); + ASSERT_TRUE(perf_devices.count() > 0); + ASSERT_TRUE(perf_tenants.count() > 0); + ASSERT_TRUE(perf_loads.count() > 0); + ObIOManager::get_instance().destroy(); + const int64_t memory_limit = 30L * 1024L * 1024L * 1024L; // 30GB + const int64_t queue_depth = 100L; + ASSERT_SUCC(ObIOManager::get_instance().init(memory_limit, queue_depth, scheduler_config.sender_count_, scheduler_config.schedule_media_id_)); + ASSERT_SUCC(ObIOManager::get_instance().start()); + // prepare devices and files + char *device_buf = (char *)malloc(sizeof(ObLocalDevice) * perf_devices.count()); + ASSERT_TRUE(nullptr != device_buf); + for (int64_t i = 0; i < perf_devices.count(); ++i) { + IOPerfDevice &curr_config = perf_devices.at(i); + ASSERT_SUCC(prepare_file(curr_config.file_path_, curr_config.file_size_, curr_config.fd_)); + ObLocalDevice *device = new (device_buf + sizeof(ObLocalDevice) * i) ObLocalDevice; + ASSERT_SUCC(init_device(curr_config.media_id_, *device)); + ASSERT_SUCC(OB_IO_MANAGER.add_device_channel(device, curr_config.async_channel_count_, curr_config.sync_channel_count_, curr_config.max_io_depth_)); + curr_config.device_handle_ = device; + } + // prepare tenant io manager + for (int64_t i = 0; i < perf_tenants.count(); ++i) { + IOPerfTenant &curr_config = perf_tenants.at(i); + if (curr_config.tenant_id_ == 1002) { + LOG_INFO("qilu: tenant config", K(curr_config), K(i)); + ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_)); + ObRefHolder tenant_holder; + ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder)); + ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config()); + } + } + // prepare perf runners + char *runner_buf = (char *)malloc(perf_loads.count() * sizeof(IOPerfRunner)); + char *modifyer_buf = (char *)malloc(perf_loads.count() * sizeof(IOGroupModify)); + ObArray runners; + ObArray modifyers; + const int64_t start_ts = ObTimeUtility::current_time() + 10000L; + for (int64_t i = 0; i < perf_loads.count(); ++i) { + IOPerfRunner *runner = new (runner_buf + i * sizeof(IOPerfRunner)) IOPerfRunner(); + const IOPerfLoad &cur_load = perf_loads.at(i); + if (cur_load.tenant_id_ == 1002) { + ASSERT_SUCC(runner->init(start_ts, cur_load)); + ASSERT_SUCC(runners.push_back(runner)); + LOG_INFO("runner start now"); + } + } + //prepare modifyer + for (int64_t i = 0; i < perf_tenants.count(); ++i) { + IOPerfTenant &curr_tenant = perf_tenants.at(i); + if (curr_tenant.tenant_id_ == 1002) { + IOGroupModify *modifyer=new (modifyer_buf + i * sizeof(IOGroupModify)) IOGroupModify(); + int64_t modify_init_ts = start_ts; + int64_t modify_delay_ts = 3000000L; //3s后开始修改 + ASSERT_SUCC(modifyer->init(modify_init_ts, modify_delay_ts, curr_tenant)); + ASSERT_SUCC(modifyers.push_back(modifyer)); + } + } + // wait perf finished + for (int64_t i = 0; i < runners.count(); ++i) { + IOPerfRunner *runner = runners.at(i); + runner->wait(); + ASSERT_SUCC(runner->print_result()); + runner->destroy(); + } + free(runner_buf); + free(modifyer_buf); + ObIOManager::get_instance().stop(); + ObIOManager::get_instance().destroy(); + for (int64_t i = 0; i < perf_devices.count(); ++i) { + ObLocalDevice *device_handle = perf_devices.at(i).device_handle_; +// ASSERT_SUCC(OB_IO_MANAGER.remove_device_channel(device_handle)); + device_handle->destroy(); + } + free(device_buf); + LOG_INFO("qilu: modify group finished"); +} + + TEST_F(TestIOManager, abnormal) { // simulate submit failure @@ -1783,13 +1893,14 @@ void write_group_perf_config() "\n" "tenant_id min_iops max_iops weight group\n" "1001 5000 100000 700 10001: testgroup1: 80, 100, 60; 10002: testgroup2: 10, 60, 30; 0: OTHER_GROUPS: 10, 100, 10;\n" - "1002 1000 50000 1000 0: testgroup1: 100, 100, 100;\n" + "1002 1000 50000 1000 12345: testgroup1: 50, 50, 50; 0: OTHER_GROUPS: 50, 50, 50;\n" "\n" "tenant_id device_id group io_mode io_size_byte io_depth perf_mode target_iops thread_count is_sequence start_s stop_s\n" "1001 1 0 r 16384 10 rolling 0 16 0 0 8\n" "1001 1 10001 r 16384 10 rolling 0 16 0 2 7\n" "1001 1 10002 r 16384 10 rolling 0 16 0 0 6\n" - "1002 1 0 r 16384 10 rolling 0 16 0 0 8\n" + "1002 1 0 r 16384 100 rolling 0 16 0 0 10\n" + "1002 1 12345 r 16384 100 rolling 0 16 0 0 10\n" ; const int64_t file_len = strlen(file_buf); int write_ret = ::write(fd, file_buf, file_len); @@ -2344,6 +2455,65 @@ int IOConfModify::modify_tenant_io( const int64_t min_iops, return ret; } +int IOGroupModify::init(int64_t modify_init_ts, int64_t modify_delay_ts, const IOPerfTenant &curr_tenant) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!curr_tenant.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(curr_tenant)); + } else if (OB_FAIL(allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, "group modifier", OB_SERVER_TENANT_ID, 1024L * 1024L * 1024L * 10L))) { + LOG_WARN("init allocator failed", K(ret)); + } else { + curr_tenant_ = curr_tenant; + modify_init_ts_ = modify_init_ts; + modify_delay_ts_ = modify_delay_ts; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(set_thread_count(load_.thread_count_ + 1))) { + LOG_WARN("set thread count failed", K(ret), K(modify_init_ts_), K(curr_tenant_)); + } else if (OB_FAIL(start())) { + LOG_WARN("start thread failed", K(ret), K(modify_init_ts_), K(curr_tenant_)); + } + } + if (OB_FAIL(ret)) { + destroy(); + } + return ret; +} + +void IOGroupModify::destroy() +{ + stop(); + wait(); + curr_tenant_ = IOPerfTenant(); +} + +void IOGroupModify::run1() +{ + int ret = OB_SUCCESS; + const int64_t thread_idx = get_thread_idx(); + LOG_INFO("modify thread start"); + + //change 1 + int64_t current_ts = ObTimeUtility::current_time(); + if (modify_init_ts_ + modify_delay_ts_ > current_ts) { + usleep(modify_init_ts_ + modify_delay_ts_ - current_ts); + } + if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, INT64_MAX, 100, 100, 100))) { + LOG_WARN("fail to modify group config", K(ret)); + } else if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, 0, 0, 0, 0))) { + LOG_WARN("fail to modify group config", K(ret)); + } + + //change 2 + usleep(3000L * 1000L); // sleep 3s + if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, INT64_MAX, 40, 40, 40))) { + LOG_WARN("fail to modify group config", K(ret)); + } else if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, 0, 60, 60, 60))) { + LOG_WARN("fail to modify group config", K(ret)); + } +} + int IOTracerSwitch::init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant) { int ret = OB_SUCCESS;