diff --git a/unittest/storage/test_io_manager.cpp b/unittest/storage/test_io_manager.cpp index 678eca0014..e24273bf7b 100644 --- a/unittest/storage/test_io_manager.cpp +++ b/unittest/storage/test_io_manager.cpp @@ -969,6 +969,24 @@ public: ObIOFd fd_; }; +class IOTracerSwitch : public ThreadPool +{ +public: + IOTracerSwitch() + {} + int init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant); + void destroy(); + virtual void run1() override; + int modify_tenant_io(IOPerfTenant &curr_tenant); + TO_STRING_KV(K(load_), K(switch_init_ts_), K(switch_delay_ts_), K(curr_tenant_), K(load_)); +public: + int64_t switch_init_ts_; + int64_t switch_delay_ts_; + IOPerfTenant curr_tenant_; + ObConcurrentFIFOAllocator allocator_; + IOPerfLoad load_; +}; + #define GROUP_PERF_CONFIG_FILE "io_perf.conf" void write_group_perf_config(); @@ -1134,7 +1152,94 @@ TEST_F(TestIOManager, perf) LOG_INFO("wenqu: perf finished"); } -TEST_F(TestIOManager, modify) +TEST_F(TestIOManager, IOTracer) +{ + // use multi thread to do modify group_io_config + bool is_perf_config_exist = false; + ASSERT_SUCC(FileDirectoryUtils::is_exists(GROUP_PERF_CONFIG_FILE, is_perf_config_exist)); + if (!is_perf_config_exist) { + write_group_perf_config(); + } + // parse configs + IOPerfScheduler scheduler_config; + ObArray perf_devices; + ObArray perf_tenants; + ObArray perf_loads; + ASSERT_SUCC(parse_group_perf_config(GROUP_PERF_CONFIG_FILE, scheduler_config, perf_devices, perf_tenants, perf_loads)); + ASSERT_TRUE(perf_devices.count() > 0); + ASSERT_TRUE(perf_tenants.count() > 0); + ASSERT_TRUE(perf_loads.count() > 0); + + ObIOManager::get_instance().destroy(); + const int64_t memory_limit = 30L * 1024L * 1024L * 1024L; // 30GB + const int64_t queue_depth = 100L; + ASSERT_SUCC(ObIOManager::get_instance().init(memory_limit, queue_depth, scheduler_config.sender_count_, scheduler_config.schedule_media_id_)); + ASSERT_SUCC(ObIOManager::get_instance().start()); + + // prepare devices and files + char *device_buf = (char *)malloc(sizeof(ObLocalDevice) * perf_devices.count()); + ASSERT_TRUE(nullptr != device_buf); + for (int64_t i = 0; i < perf_devices.count(); ++i) { + IOPerfDevice &curr_config = perf_devices.at(i); + ASSERT_SUCC(prepare_file(curr_config.file_path_, curr_config.file_size_, curr_config.fd_)); + ObLocalDevice *device = new (device_buf + sizeof(ObLocalDevice) * i) ObLocalDevice; + ASSERT_SUCC(init_device(curr_config.media_id_, *device)); + ASSERT_SUCC(OB_IO_MANAGER.add_device_channel(device, curr_config.async_channel_count_, curr_config.sync_channel_count_, curr_config.max_io_depth_)); + curr_config.device_handle_ = device; + } + // prepare tenant io manager + for (int64_t i = 0; i < perf_tenants.count(); ++i) { + IOPerfTenant &curr_config = perf_tenants.at(i); + LOG_INFO("wenqu: tenant config", K(curr_config), K(i)); + ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_)); + ObRefHolder tenant_holder; + ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder)); + ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config()); + } + // prepare perf runners + char *runner_buf = (char *)malloc(perf_loads.count() * sizeof(IOPerfRunner)); + char *modifyer_buf = (char *)malloc(perf_loads.count() * sizeof(IOTracerSwitch)); + ObArray runners; + ObArray switches; + const int64_t start_ts = ObTimeUtility::current_time() + 10000L; + for (int64_t i = 0; i < perf_loads.count(); ++i) { + IOPerfRunner *runner = new (runner_buf + i * sizeof(IOPerfRunner)) IOPerfRunner(); + const IOPerfLoad &cur_load = perf_loads.at(i); + ASSERT_SUCC(runner->init(start_ts, cur_load)); + ASSERT_SUCC(runners.push_back(runner)); + LOG_INFO("runner start now"); + } + //open tracer + for (int64_t i = 0; i < perf_tenants.count(); ++i) { + IOTracerSwitch *tracer_switch = new (modifyer_buf + i * sizeof(IOTracerSwitch)) IOTracerSwitch(); + IOPerfTenant &curr_tenant = perf_tenants.at(i); + int64_t switch_init_ts = start_ts; + int64_t switch_delay_ts = 1000000L; //1s后打开开关 + ASSERT_SUCC(tracer_switch->init(switch_init_ts, switch_delay_ts, curr_tenant)); + ASSERT_SUCC(switches.push_back(tracer_switch)); + } + // wait perf finished + for (int64_t i = 0; i < runners.count(); ++i) { + IOPerfRunner *runner = runners.at(i); + runner->wait(); + ASSERT_SUCC(runner->print_result()); + runner->destroy(); + } + free(runner_buf); + free(modifyer_buf); + + ObIOManager::get_instance().stop(); + ObIOManager::get_instance().destroy(); + for (int64_t i = 0; i < perf_devices.count(); ++i) { + ObLocalDevice *device_handle = perf_devices.at(i).device_handle_; +// ASSERT_SUCC(OB_IO_MANAGER.remove_device_channel(device_handle)); + device_handle->destroy(); + } + free(device_buf); + LOG_INFO("wenqu: modify finished"); +} + +TEST_F(TestIOManager, ModifyIOPS) { // use multi thread to do modify group_io_config bool is_perf_config_exist = false; @@ -1857,3 +1962,60 @@ int IOConfModify::modify_tenant_io( const int64_t min_iops, } return ret; } + +int IOTracerSwitch::init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!curr_tenant.is_valid())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", K(ret), K(curr_tenant)); + } else if (OB_FAIL(allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, "perf runner", OB_SERVER_TENANT_ID, 1024L * 1024L * 1024L * 10L))) { + LOG_WARN("init allocator failed", K(ret)); + } else { + curr_tenant_ = curr_tenant; + switch_init_ts_ = switch_init_ts; + switch_delay_ts_ = switch_delay_ts; + } + if (OB_SUCC(ret)) { + if (OB_FAIL(set_thread_count(load_.thread_count_ + 1))) { + LOG_WARN("set thread count failed", K(ret), K(load_)); + } else if (OB_FAIL(start())) { + LOG_WARN("start thread failed", K(ret)); + } + } + if (OB_FAIL(ret)) { + destroy(); + } + return ret; +} + +void IOTracerSwitch::destroy() +{ + stop(); + wait(); + curr_tenant_ = IOPerfTenant(); +} + +void IOTracerSwitch::run1() +{ + int ret = OB_SUCCESS; + const int64_t thread_idx = get_thread_idx(); + LOG_INFO("modify thread start"); + const int64_t current_ts = ObTimeUtility::current_time(); + if (switch_init_ts_ + switch_delay_ts_ > current_ts) { + usleep(switch_init_ts_ + switch_delay_ts_ - current_ts); + } + if (OB_FAIL(modify_tenant_io(curr_tenant_))) { + LOG_WARN("modify config failed", K(ret), K(curr_tenant_)); + } +} + +int IOTracerSwitch::modify_tenant_io(IOPerfTenant &curr_tenant) +{ + int ret = OB_SUCCESS; + ATOMIC_SET(&curr_tenant.config_.enable_io_tracer_, true); + if (OB_FAIL(OB_IO_MANAGER.refresh_tenant_io_config(curr_tenant.tenant_id_, curr_tenant.config_))) { + LOG_WARN("refresh tenant io config failed", K(ret), K(curr_tenant.tenant_id_), K(curr_tenant.config_)); + } + return ret; +} \ No newline at end of file