add unittest for io_tracer
This commit is contained in:
@ -969,6 +969,24 @@ public:
|
||||
ObIOFd fd_;
|
||||
};
|
||||
|
||||
class IOTracerSwitch : public ThreadPool
|
||||
{
|
||||
public:
|
||||
IOTracerSwitch()
|
||||
{}
|
||||
int init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant);
|
||||
void destroy();
|
||||
virtual void run1() override;
|
||||
int modify_tenant_io(IOPerfTenant &curr_tenant);
|
||||
TO_STRING_KV(K(load_), K(switch_init_ts_), K(switch_delay_ts_), K(curr_tenant_), K(load_));
|
||||
public:
|
||||
int64_t switch_init_ts_;
|
||||
int64_t switch_delay_ts_;
|
||||
IOPerfTenant curr_tenant_;
|
||||
ObConcurrentFIFOAllocator allocator_;
|
||||
IOPerfLoad load_;
|
||||
};
|
||||
|
||||
#define GROUP_PERF_CONFIG_FILE "io_perf.conf"
|
||||
|
||||
void write_group_perf_config();
|
||||
@ -1134,7 +1152,94 @@ TEST_F(TestIOManager, perf)
|
||||
LOG_INFO("wenqu: perf finished");
|
||||
}
|
||||
|
||||
TEST_F(TestIOManager, modify)
|
||||
TEST_F(TestIOManager, IOTracer)
|
||||
{
|
||||
// use multi thread to do modify group_io_config
|
||||
bool is_perf_config_exist = false;
|
||||
ASSERT_SUCC(FileDirectoryUtils::is_exists(GROUP_PERF_CONFIG_FILE, is_perf_config_exist));
|
||||
if (!is_perf_config_exist) {
|
||||
write_group_perf_config();
|
||||
}
|
||||
// parse configs
|
||||
IOPerfScheduler scheduler_config;
|
||||
ObArray<IOPerfDevice> perf_devices;
|
||||
ObArray<IOPerfTenant> perf_tenants;
|
||||
ObArray<IOPerfLoad> perf_loads;
|
||||
ASSERT_SUCC(parse_group_perf_config(GROUP_PERF_CONFIG_FILE, scheduler_config, perf_devices, perf_tenants, perf_loads));
|
||||
ASSERT_TRUE(perf_devices.count() > 0);
|
||||
ASSERT_TRUE(perf_tenants.count() > 0);
|
||||
ASSERT_TRUE(perf_loads.count() > 0);
|
||||
|
||||
ObIOManager::get_instance().destroy();
|
||||
const int64_t memory_limit = 30L * 1024L * 1024L * 1024L; // 30GB
|
||||
const int64_t queue_depth = 100L;
|
||||
ASSERT_SUCC(ObIOManager::get_instance().init(memory_limit, queue_depth, scheduler_config.sender_count_, scheduler_config.schedule_media_id_));
|
||||
ASSERT_SUCC(ObIOManager::get_instance().start());
|
||||
|
||||
// prepare devices and files
|
||||
char *device_buf = (char *)malloc(sizeof(ObLocalDevice) * perf_devices.count());
|
||||
ASSERT_TRUE(nullptr != device_buf);
|
||||
for (int64_t i = 0; i < perf_devices.count(); ++i) {
|
||||
IOPerfDevice &curr_config = perf_devices.at(i);
|
||||
ASSERT_SUCC(prepare_file(curr_config.file_path_, curr_config.file_size_, curr_config.fd_));
|
||||
ObLocalDevice *device = new (device_buf + sizeof(ObLocalDevice) * i) ObLocalDevice;
|
||||
ASSERT_SUCC(init_device(curr_config.media_id_, *device));
|
||||
ASSERT_SUCC(OB_IO_MANAGER.add_device_channel(device, curr_config.async_channel_count_, curr_config.sync_channel_count_, curr_config.max_io_depth_));
|
||||
curr_config.device_handle_ = device;
|
||||
}
|
||||
// prepare tenant io manager
|
||||
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
|
||||
IOPerfTenant &curr_config = perf_tenants.at(i);
|
||||
LOG_INFO("wenqu: tenant config", K(curr_config), K(i));
|
||||
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
|
||||
ObRefHolder<ObTenantIOManager> tenant_holder;
|
||||
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
|
||||
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
|
||||
}
|
||||
// prepare perf runners
|
||||
char *runner_buf = (char *)malloc(perf_loads.count() * sizeof(IOPerfRunner));
|
||||
char *modifyer_buf = (char *)malloc(perf_loads.count() * sizeof(IOTracerSwitch));
|
||||
ObArray<IOPerfRunner *> runners;
|
||||
ObArray<IOTracerSwitch *> switches;
|
||||
const int64_t start_ts = ObTimeUtility::current_time() + 10000L;
|
||||
for (int64_t i = 0; i < perf_loads.count(); ++i) {
|
||||
IOPerfRunner *runner = new (runner_buf + i * sizeof(IOPerfRunner)) IOPerfRunner();
|
||||
const IOPerfLoad &cur_load = perf_loads.at(i);
|
||||
ASSERT_SUCC(runner->init(start_ts, cur_load));
|
||||
ASSERT_SUCC(runners.push_back(runner));
|
||||
LOG_INFO("runner start now");
|
||||
}
|
||||
//open tracer
|
||||
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
|
||||
IOTracerSwitch *tracer_switch = new (modifyer_buf + i * sizeof(IOTracerSwitch)) IOTracerSwitch();
|
||||
IOPerfTenant &curr_tenant = perf_tenants.at(i);
|
||||
int64_t switch_init_ts = start_ts;
|
||||
int64_t switch_delay_ts = 1000000L; //1s后打开开关
|
||||
ASSERT_SUCC(tracer_switch->init(switch_init_ts, switch_delay_ts, curr_tenant));
|
||||
ASSERT_SUCC(switches.push_back(tracer_switch));
|
||||
}
|
||||
// wait perf finished
|
||||
for (int64_t i = 0; i < runners.count(); ++i) {
|
||||
IOPerfRunner *runner = runners.at(i);
|
||||
runner->wait();
|
||||
ASSERT_SUCC(runner->print_result());
|
||||
runner->destroy();
|
||||
}
|
||||
free(runner_buf);
|
||||
free(modifyer_buf);
|
||||
|
||||
ObIOManager::get_instance().stop();
|
||||
ObIOManager::get_instance().destroy();
|
||||
for (int64_t i = 0; i < perf_devices.count(); ++i) {
|
||||
ObLocalDevice *device_handle = perf_devices.at(i).device_handle_;
|
||||
// ASSERT_SUCC(OB_IO_MANAGER.remove_device_channel(device_handle));
|
||||
device_handle->destroy();
|
||||
}
|
||||
free(device_buf);
|
||||
LOG_INFO("wenqu: modify finished");
|
||||
}
|
||||
|
||||
TEST_F(TestIOManager, ModifyIOPS)
|
||||
{
|
||||
// use multi thread to do modify group_io_config
|
||||
bool is_perf_config_exist = false;
|
||||
@ -1857,3 +1962,60 @@ int IOConfModify::modify_tenant_io( const int64_t min_iops,
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int IOTracerSwitch::init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(!curr_tenant.is_valid())) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("invalid argument", K(ret), K(curr_tenant));
|
||||
} else if (OB_FAIL(allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, "perf runner", OB_SERVER_TENANT_ID, 1024L * 1024L * 1024L * 10L))) {
|
||||
LOG_WARN("init allocator failed", K(ret));
|
||||
} else {
|
||||
curr_tenant_ = curr_tenant;
|
||||
switch_init_ts_ = switch_init_ts;
|
||||
switch_delay_ts_ = switch_delay_ts;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_FAIL(set_thread_count(load_.thread_count_ + 1))) {
|
||||
LOG_WARN("set thread count failed", K(ret), K(load_));
|
||||
} else if (OB_FAIL(start())) {
|
||||
LOG_WARN("start thread failed", K(ret));
|
||||
}
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
destroy();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void IOTracerSwitch::destroy()
|
||||
{
|
||||
stop();
|
||||
wait();
|
||||
curr_tenant_ = IOPerfTenant();
|
||||
}
|
||||
|
||||
void IOTracerSwitch::run1()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t thread_idx = get_thread_idx();
|
||||
LOG_INFO("modify thread start");
|
||||
const int64_t current_ts = ObTimeUtility::current_time();
|
||||
if (switch_init_ts_ + switch_delay_ts_ > current_ts) {
|
||||
usleep(switch_init_ts_ + switch_delay_ts_ - current_ts);
|
||||
}
|
||||
if (OB_FAIL(modify_tenant_io(curr_tenant_))) {
|
||||
LOG_WARN("modify config failed", K(ret), K(curr_tenant_));
|
||||
}
|
||||
}
|
||||
|
||||
int IOTracerSwitch::modify_tenant_io(IOPerfTenant &curr_tenant)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ATOMIC_SET(&curr_tenant.config_.enable_io_tracer_, true);
|
||||
if (OB_FAIL(OB_IO_MANAGER.refresh_tenant_io_config(curr_tenant.tenant_id_, curr_tenant.config_))) {
|
||||
LOG_WARN("refresh tenant io config failed", K(ret), K(curr_tenant.tenant_id_), K(curr_tenant.config_));
|
||||
}
|
||||
return ret;
|
||||
}
|
Reference in New Issue
Block a user