[CP] fix io schedule when close resource_plan

This commit is contained in:
renju96
2024-02-07 02:13:28 +00:00
committed by ob-robot
parent bbd30977cd
commit 1be977d9d5
7 changed files with 255 additions and 13 deletions

View File

@ -1181,6 +1181,25 @@ public:
ObIOFd fd_;
};
class IOGroupModify : public ThreadPool
{
public:
IOGroupModify()
: modify_init_ts_(0)
{}
int init(int64_t modify_init_ts, int64_t modify_delay_ts, const IOPerfTenant &curr_tenant);
void destroy();
virtual void run1() override;
TO_STRING_KV(K(load_), K(modify_delay_ts_), K(fd_), K(curr_tenant_));
public:
int64_t modify_init_ts_;
int64_t modify_delay_ts_;
IOPerfTenant curr_tenant_;
ObConcurrentFIFOAllocator allocator_;
IOPerfLoad load_;
ObIOFd fd_;
};
class IOTracerSwitch : public ThreadPool
{
public:
@ -1695,6 +1714,97 @@ TEST_F(TestIOManager, ModifyCallbackThread)
LOG_INFO("modify callback thread finished");
}
TEST_F(TestIOManager, ModifyGroupIO)
{
// use multi thread to do modify group_io_config
bool is_perf_config_exist = false;
ASSERT_SUCC(FileDirectoryUtils::is_exists(GROUP_PERF_CONFIG_FILE, is_perf_config_exist));
if (!is_perf_config_exist) {
write_group_perf_config();
}
// parse configs
IOPerfScheduler scheduler_config;
ObArray<IOPerfDevice> perf_devices;
ObArray<IOPerfTenant> perf_tenants;
ObArray<IOPerfLoad> perf_loads;
ASSERT_SUCC(parse_group_perf_config(GROUP_PERF_CONFIG_FILE, scheduler_config, perf_devices, perf_tenants, perf_loads));
ASSERT_TRUE(perf_devices.count() > 0);
ASSERT_TRUE(perf_tenants.count() > 0);
ASSERT_TRUE(perf_loads.count() > 0);
ObIOManager::get_instance().destroy();
const int64_t memory_limit = 30L * 1024L * 1024L * 1024L; // 30GB
const int64_t queue_depth = 100L;
ASSERT_SUCC(ObIOManager::get_instance().init(memory_limit, queue_depth, scheduler_config.sender_count_, scheduler_config.schedule_media_id_));
ASSERT_SUCC(ObIOManager::get_instance().start());
// prepare devices and files
char *device_buf = (char *)malloc(sizeof(ObLocalDevice) * perf_devices.count());
ASSERT_TRUE(nullptr != device_buf);
for (int64_t i = 0; i < perf_devices.count(); ++i) {
IOPerfDevice &curr_config = perf_devices.at(i);
ASSERT_SUCC(prepare_file(curr_config.file_path_, curr_config.file_size_, curr_config.fd_));
ObLocalDevice *device = new (device_buf + sizeof(ObLocalDevice) * i) ObLocalDevice;
ASSERT_SUCC(init_device(curr_config.media_id_, *device));
ASSERT_SUCC(OB_IO_MANAGER.add_device_channel(device, curr_config.async_channel_count_, curr_config.sync_channel_count_, curr_config.max_io_depth_));
curr_config.device_handle_ = device;
}
// prepare tenant io manager
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
if (curr_config.tenant_id_ == 1002) {
LOG_INFO("qilu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
}
}
// prepare perf runners
char *runner_buf = (char *)malloc(perf_loads.count() * sizeof(IOPerfRunner));
char *modifyer_buf = (char *)malloc(perf_loads.count() * sizeof(IOGroupModify));
ObArray<IOPerfRunner *> runners;
ObArray<IOGroupModify *> modifyers;
const int64_t start_ts = ObTimeUtility::current_time() + 10000L;
for (int64_t i = 0; i < perf_loads.count(); ++i) {
IOPerfRunner *runner = new (runner_buf + i * sizeof(IOPerfRunner)) IOPerfRunner();
const IOPerfLoad &cur_load = perf_loads.at(i);
if (cur_load.tenant_id_ == 1002) {
ASSERT_SUCC(runner->init(start_ts, cur_load));
ASSERT_SUCC(runners.push_back(runner));
LOG_INFO("runner start now");
}
}
//prepare modifyer
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_tenant = perf_tenants.at(i);
if (curr_tenant.tenant_id_ == 1002) {
IOGroupModify *modifyer=new (modifyer_buf + i * sizeof(IOGroupModify)) IOGroupModify();
int64_t modify_init_ts = start_ts;
int64_t modify_delay_ts = 3000000L; //3s后开始修改
ASSERT_SUCC(modifyer->init(modify_init_ts, modify_delay_ts, curr_tenant));
ASSERT_SUCC(modifyers.push_back(modifyer));
}
}
// wait perf finished
for (int64_t i = 0; i < runners.count(); ++i) {
IOPerfRunner *runner = runners.at(i);
runner->wait();
ASSERT_SUCC(runner->print_result());
runner->destroy();
}
free(runner_buf);
free(modifyer_buf);
ObIOManager::get_instance().stop();
ObIOManager::get_instance().destroy();
for (int64_t i = 0; i < perf_devices.count(); ++i) {
ObLocalDevice *device_handle = perf_devices.at(i).device_handle_;
// ASSERT_SUCC(OB_IO_MANAGER.remove_device_channel(device_handle));
device_handle->destroy();
}
free(device_buf);
LOG_INFO("qilu: modify group finished");
}
TEST_F(TestIOManager, abnormal)
{
// simulate submit failure
@ -1783,13 +1893,14 @@ void write_group_perf_config()
"\n"
"tenant_id min_iops max_iops weight group\n"
"1001 5000 100000 700 10001: testgroup1: 80, 100, 60; 10002: testgroup2: 10, 60, 30; 0: OTHER_GROUPS: 10, 100, 10;\n"
"1002 1000 50000 1000 0: testgroup1: 100, 100, 100;\n"
"1002 1000 50000 1000 12345: testgroup1: 50, 50, 50; 0: OTHER_GROUPS: 50, 50, 50;\n"
"\n"
"tenant_id device_id group io_mode io_size_byte io_depth perf_mode target_iops thread_count is_sequence start_s stop_s\n"
"1001 1 0 r 16384 10 rolling 0 16 0 0 8\n"
"1001 1 10001 r 16384 10 rolling 0 16 0 2 7\n"
"1001 1 10002 r 16384 10 rolling 0 16 0 0 6\n"
"1002 1 0 r 16384 10 rolling 0 16 0 0 8\n"
"1002 1 0 r 16384 100 rolling 0 16 0 0 10\n"
"1002 1 12345 r 16384 100 rolling 0 16 0 0 10\n"
;
const int64_t file_len = strlen(file_buf);
int write_ret = ::write(fd, file_buf, file_len);
@ -2344,6 +2455,65 @@ int IOConfModify::modify_tenant_io( const int64_t min_iops,
return ret;
}
int IOGroupModify::init(int64_t modify_init_ts, int64_t modify_delay_ts, const IOPerfTenant &curr_tenant)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(!curr_tenant.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(curr_tenant));
} else if (OB_FAIL(allocator_.init(OB_MALLOC_BIG_BLOCK_SIZE, "group modifier", OB_SERVER_TENANT_ID, 1024L * 1024L * 1024L * 10L))) {
LOG_WARN("init allocator failed", K(ret));
} else {
curr_tenant_ = curr_tenant;
modify_init_ts_ = modify_init_ts;
modify_delay_ts_ = modify_delay_ts;
}
if (OB_SUCC(ret)) {
if (OB_FAIL(set_thread_count(load_.thread_count_ + 1))) {
LOG_WARN("set thread count failed", K(ret), K(modify_init_ts_), K(curr_tenant_));
} else if (OB_FAIL(start())) {
LOG_WARN("start thread failed", K(ret), K(modify_init_ts_), K(curr_tenant_));
}
}
if (OB_FAIL(ret)) {
destroy();
}
return ret;
}
void IOGroupModify::destroy()
{
stop();
wait();
curr_tenant_ = IOPerfTenant();
}
void IOGroupModify::run1()
{
int ret = OB_SUCCESS;
const int64_t thread_idx = get_thread_idx();
LOG_INFO("modify thread start");
//change 1
int64_t current_ts = ObTimeUtility::current_time();
if (modify_init_ts_ + modify_delay_ts_ > current_ts) {
usleep(modify_init_ts_ + modify_delay_ts_ - current_ts);
}
if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, INT64_MAX, 100, 100, 100))) {
LOG_WARN("fail to modify group config", K(ret));
} else if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, 0, 0, 0, 0))) {
LOG_WARN("fail to modify group config", K(ret));
}
//change 2
usleep(3000L * 1000L); // sleep 3s
if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, INT64_MAX, 40, 40, 40))) {
LOG_WARN("fail to modify group config", K(ret));
} else if (OB_FAIL(OB_IO_MANAGER.modify_group_io_config(curr_tenant_.tenant_id_, 0, 60, 60, 60))) {
LOG_WARN("fail to modify group config", K(ret));
}
}
int IOTracerSwitch::init(int64_t switch_init_ts, int64_t switch_delay_ts, const IOPerfTenant &curr_tenant)
{
int ret = OB_SUCCESS;