remove tenant io manager hash in io manager

This commit is contained in:
nroskill 2024-02-09 08:52:00 +00:00 committed by ob-robot
parent 24f5ebe970
commit 4c24ed8d60
33 changed files with 250 additions and 340 deletions

View File

@ -189,8 +189,6 @@ int Threads::start()
Threads::stop();
Threads::wait();
Threads::destroy();
} else {
stop_ = false;
}
}
return ret;

View File

@ -537,8 +537,6 @@ int MockTenantModuleEnv::prepare_io()
SERVER_LOG(ERROR, "init log pool fail", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().start())) {
STORAGE_LOG(WARN, "fail to start io manager", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, io_config))) {
STORAGE_LOG(WARN, "add tenant io config failed", K(ret));
} else if (OB_FAIL(ObKVGlobalCache::get_instance().init(&getter_,
bucket_num,
max_cache_size,
@ -678,7 +676,7 @@ int MockTenantModuleEnv::init()
STORAGE_LOG(ERROR, "init_before_start_mtl failed", K(ret));
} else {
oceanbase::ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION);
MTL_BIND2(nullptr, ObTenantIOManager::mtl_init, nullptr, nullptr, nullptr, ObTenantIOManager::mtl_destroy);
MTL_BIND2(ObTenantIOManager::mtl_new, ObTenantIOManager::mtl_init, mtl_start_default, mtl_stop_default, nullptr, ObTenantIOManager::mtl_destroy);
MTL_BIND2(mtl_new_default, omt::ObSharedTimer::mtl_init, omt::ObSharedTimer::mtl_start, omt::ObSharedTimer::mtl_stop, omt::ObSharedTimer::mtl_wait, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObTenantSchemaService::mtl_init, nullptr, nullptr, nullptr, mtl_destroy_default);
MTL_BIND2(mtl_new_default, ObStorageLogger::mtl_init, ObStorageLogger::mtl_start, ObStorageLogger::mtl_stop, ObStorageLogger::mtl_wait, mtl_destroy_default);

View File

@ -222,9 +222,6 @@ void TestIndexBlockDataPrepare::SetUpTestCase()
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
ObClockGenerator::init();
ObIOManager::get_instance().add_tenant_io_manager(
tenant_id_, ObTenantIOConfig::default_instance());
fake_freeze_info();
// create ls

View File

@ -190,10 +190,6 @@ void TestIndexTree::SetUp()
ASSERT_EQ(decode_res_pool_, MTL(ObDecodeResourcePool *));
ASSERT_EQ(mem_pool_, MTL(ObTenantCompactionMemPool *));
int tmp_ret = OB_SUCCESS;
ObTenantIOConfig io_config = ObTenantIOConfig::default_instance();
if (OB_TMP_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, io_config))) {
STORAGE_LOG(WARN, "add tenant io config failed", K(tmp_ret));
}
prepare_schema();
row_generate_.reset();

View File

@ -95,9 +95,6 @@ void TestLSTabletService::SetUpTestCase()
ASSERT_EQ(OB_SUCCESS, ret);
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
ObIOManager::get_instance().add_tenant_io_manager(
TestSchemaUtils::TEST_TENANT_ID, ObTenantIOConfig::default_instance());
// create ls
ObLSHandle ls_handle;
ret = TestDmlCommon::create_ls(TestSchemaUtils::TEST_TENANT_ID, ObLSID(TEST_LS_ID), ls_handle);

View File

@ -63,10 +63,6 @@ void TestSharedBlockRWriter::TearDownTestCase()
void TestSharedBlockRWriter::SetUp()
{
int ret = OB_SUCCESS;
const ObTenantIOConfig &io_config = ObTenantIOConfig::default_instance();
if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, io_config))) {
STORAGE_LOG(WARN, "add tenant io config failed");
}
}
void TestSharedBlockRWriter::TearDown()

View File

@ -98,10 +98,6 @@ void TestTabletMdsData::TearDownTestCase()
void TestTabletMdsData::SetUp()
{
int ret = OB_SUCCESS;
const ObTenantIOConfig &io_config = ObTenantIOConfig::default_instance();
if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, io_config))) {
STORAGE_LOG(WARN, "add tenant io config failed");
}
}
void TestTabletMdsData::TearDown()

View File

@ -2118,7 +2118,6 @@ int ObServer::init_io()
}
io_config.disk_io_thread_count_ = GCONF.disk_io_thread_count;
const int64_t max_io_depth = 256;
ObTenantIOConfig server_tenant_io_config = ObTenantIOConfig::default_instance();
if (OB_FAIL(ObIOManager::get_instance().set_io_config(io_config))) {
LOG_ERROR("config io manager fail, ", KR(ret));
} else {
@ -2196,9 +2195,6 @@ int ObServer::init_io()
io_config.disk_io_thread_count_ / 2,
max_io_depth))) {
LOG_ERROR("add device channel failed", KR(ret));
} else if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID,
server_tenant_io_config))) {
LOG_ERROR("add server tenant io manager failed", KR(ret));
}
}
}

View File

@ -438,7 +438,7 @@ int ObMultiTenant::init(ObAddr myaddr,
}
if (OB_SUCC(ret) && mtl_bind_flag) {
MTL_BIND2(nullptr, ObTenantIOManager::mtl_init, nullptr, nullptr, nullptr, ObTenantIOManager::mtl_destroy);
MTL_BIND2(ObTenantIOManager::mtl_new, ObTenantIOManager::mtl_init, mtl_start_default, mtl_stop_default, nullptr, ObTenantIOManager::mtl_destroy);
// base mtl
MTL_BIND2(mtl_new_default, storage::mds::ObTenantMdsService::mtl_init, storage::mds::ObTenantMdsService::mtl_start, storage::mds::ObTenantMdsService::mtl_stop, storage::mds::ObTenantMdsService::mtl_wait, mtl_destroy_default);

View File

@ -317,16 +317,17 @@ ObAllVirtualIOQuota::~ObAllVirtualIOQuota()
int ObAllVirtualIOQuota::init(const common::ObAddr &addr)
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
ObVector<uint64_t> tenant_ids;
if (OB_FAIL(init_addr(addr))) {
LOG_WARN("init failed", K(ret), K(addr));
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) {
LOG_WARN("get tenant id failed", K(ret));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
GCTX.omt_->get_tenant_ids(tenant_ids);
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.size(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (is_virtual_tenant_id(cur_tenant_id)) {
// do nothing
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id));
} else {
@ -584,20 +585,21 @@ ObAllVirtualIOScheduler::~ObAllVirtualIOScheduler()
int ObAllVirtualIOScheduler::init(const common::ObAddr &addr)
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
ObVector<uint64_t> tenant_ids;
if (OB_FAIL(init_addr(addr))) {
LOG_WARN("init failed", K(ret), K(addr));
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) {
LOG_WARN("get tenant id failed", K(ret));
} else {
GCTX.omt_->get_tenant_ids(tenant_ids);
ObIOScheduler *io_scheduler = OB_IO_MANAGER.get_scheduler();
int64_t thread_num = io_scheduler->get_senders_count();
for (int64_t thread_id = 0; OB_SUCC(ret) && thread_id < thread_num; ++thread_id) {
ObIOSender *cur_sender = io_scheduler->get_cur_sender(thread_id);
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.size(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (is_virtual_tenant_id(cur_tenant_id)) {
// do nothing
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id));
} else {

View File

@ -30,8 +30,7 @@ ObIOManager::ObIOManager()
io_config_(),
allocator_(),
fault_detector_(io_config_),
io_scheduler_(io_config_, allocator_),
tenant_map_lock_(ObLatchIds::TENANT_IO_MANAGE_LOCK)
io_scheduler_(io_config_, allocator_)
{
}
@ -63,20 +62,24 @@ int ObIOManager::init(const int64_t memory_limit,
LOG_WARN("init io allocator failed", K(ret));
} else if (OB_FAIL(channel_map_.create(7, "IO_CHANNEL_MAP"))) {
LOG_WARN("create channel map failed", K(ret));
} else if (OB_FAIL(tenant_map_.create(7, "IO_TENANT_MAP"))) {
LOG_WARN("create tenant map failed", K(ret));
} else if (OB_FAIL(io_scheduler_.init(schedule_queue_count, schedule_media_id))) {
LOG_WARN("init io scheduler failed", K(ret));
} else if (OB_FAIL(fault_detector_.init())) {
LOG_WARN("init io fault detector failed", K(ret));
} else if (OB_ISNULL(server_io_manager_ = OB_NEW(ObTenantIOManager, "IO_MGR"))) {
} else if (OB_FAIL(server_io_manager_->init(OB_SERVER_TENANT_ID, ObTenantIOConfig::default_instance(), &io_scheduler_))) {
LOG_WARN("init server tenant io mgr failed", K(ret));
} else {
ObMemAttr attr(OB_SERVER_TENANT_ID, "IO_MGR");
SET_USE_500(attr);
allocator_.set_attr(attr);
io_config_.set_default_value();
is_inited_ = true;
if (OB_FAIL(server_io_manager_->start())) {
LOG_WARN("init server tenant io mgr start failed", K(ret));
}
}
if (OB_UNLIKELY(!is_inited_)) {
if (OB_FAIL(ret)) {
destroy();
}
return ret;
@ -97,21 +100,6 @@ private:
ObIAllocator &allocator_;
};
struct DestroyTenantMapFn
{
public:
DestroyTenantMapFn(ObIAllocator &allocator) : allocator_(allocator) {}
int operator () (oceanbase::common::hash::HashMapPair<uint64_t, ObTenantIOManager *> &entry) {
if (nullptr != entry.second) {
entry.second->~ObTenantIOManager();
allocator_.free(entry.second);
}
return OB_SUCCESS;
}
private:
ObIAllocator &allocator_;
};
void ObIOManager::destroy()
{
stop();
@ -120,9 +108,8 @@ void ObIOManager::destroy()
DestroyChannelMapFn destry_channel_map_fn(allocator_);
channel_map_.foreach_refactored(destry_channel_map_fn);
channel_map_.destroy();
DestroyTenantMapFn destry_tenant_map_fn(allocator_);
tenant_map_.foreach_refactored(destry_tenant_map_fn);
tenant_map_.destroy();
OB_DELETE(ObTenantIOManager, "IO_MGR", server_io_manager_);
server_io_manager_ = nullptr;
allocator_.destroy();
is_inited_ = false;
}
@ -146,6 +133,9 @@ int ObIOManager::start()
void ObIOManager::stop()
{
is_working_ = false;
if (OB_NOT_NULL(server_io_manager_)) {
server_io_manager_->stop();
}
io_scheduler_.stop();
}
@ -343,17 +333,21 @@ int ObIOManager::adjust_tenant_clock()
} else {
ObTenantIOManager *tenant_io_mgr = nullptr;
ObArray<ObTenantIOClock *> io_clocks;
DRWLock::RDLockGuard guard(tenant_map_lock_);
hash::ObHashMap<uint64_t, ObTenantIOManager *>::iterator iter = tenant_map_.begin();
for (; OB_SUCC(ret) && iter != tenant_map_.end(); ++iter) {
if (OB_ISNULL(tenant_io_mgr = iter->second)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("tenant io manager is null", K(ret));
ObVector<uint64_t> tenant_ids;
if (OB_NOT_NULL(GCTX.omt_)) {
GCTX.omt_->get_tenant_ids(tenant_ids);
}
for (int64_t i = 0; i < tenant_ids.size(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id));
} else if (FALSE_IT(tenant_io_mgr = tenant_holder.get_ptr())) {
} else if (OB_FAIL(io_clocks.push_back(tenant_io_mgr->get_io_clock()))) {
LOG_WARN("push back io clock failed", K(ret), K(tenant_map_.size()));
LOG_WARN("push back io clock failed", K(ret), K(tenant_ids.size()));
}
}
if (OB_SUCC(ret) && !io_clocks.empty()) {
if (!io_clocks.empty()) {
if (OB_FAIL(io_clocks.at(0)->sync_clocks(io_clocks))) {
LOG_WARN("sync io clocks failed", K(ret), K(io_clocks));
}
@ -473,75 +467,6 @@ int ObIOManager::get_device_channel(const ObIODevice *device_handle, ObDeviceCha
return ret;
}
int ObIOManager::add_tenant_io_manager(const uint64_t tenant_id, const ObTenantIOConfig &tenant_io_config)
{
int ret = OB_SUCCESS;
ObTenantIOManager *tenant_io_mgr = nullptr;
void *buf = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(tenant_id <= 0 || !tenant_io_config.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id), K(tenant_io_config));
} else if (OB_ISNULL(buf = allocator_.alloc(sizeof(ObTenantIOManager)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("alloc memory failed", K(ret));
} else if (FALSE_IT(tenant_io_mgr = new (buf) ObTenantIOManager())) {
} else if (OB_FAIL(tenant_io_mgr->init(tenant_id, tenant_io_config, &io_scheduler_))) {
LOG_WARN("init tenant io manager failed", K(ret), K(tenant_id), K(tenant_io_config));
} else if (OB_FAIL(tenant_io_mgr->start())) {
LOG_WARN("start tenant io manager failed", K(ret), K(tenant_id));
} else {
tenant_io_mgr->inc_ref();
DRWLock::WRLockGuard guard(tenant_map_lock_);
if (OB_FAIL(tenant_map_.set_refactored(tenant_id, tenant_io_mgr))) {
LOG_WARN("put into tenant map failed", K(ret), K(tenant_id), KP(tenant_io_mgr));
} else {
LOG_INFO("add tenant io manager success", K(tenant_id), KPC(tenant_io_mgr));
tenant_io_mgr = nullptr;
}
}
if (OB_SUCC(ret)) {
int tmp_ret = adjust_tenant_clock();
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
LOG_WARN("adjust tenant clock failed", K(tmp_ret));
}
}
if (OB_UNLIKELY(nullptr != tenant_io_mgr)) {
tenant_io_mgr->~ObTenantIOManager();
allocator_.free(tenant_io_mgr);
}
return ret;
}
int ObIOManager::remove_tenant_io_manager(const uint64_t tenant_id)
{
int ret = OB_SUCCESS;
ObTenantIOManager *tenant_io_mgr = nullptr;
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret), K(is_inited_));
} else if (OB_UNLIKELY(tenant_id <= 0)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(tenant_id));
} else if (OB_FAIL(io_scheduler_.remove_phyqueues(tenant_id))) {
LOG_WARN("remove phy_queues from map failed", K(ret), K(tenant_id));
} else {
DRWLock::WRLockGuard guard(tenant_map_lock_);
if (OB_FAIL(tenant_map_.erase_refactored(tenant_id, &tenant_io_mgr))) {
LOG_WARN("remove tenant io manager failed", K(ret), K(tenant_id), KP(tenant_io_mgr));
} else {
LOG_INFO("remove tenant io manager success", K(tenant_id), KP(tenant_io_mgr));
}
}
if (OB_SUCC(ret) && nullptr != tenant_io_mgr) {
tenant_io_mgr->stop();
tenant_io_mgr->dec_ref();
}
return ret;
}
int ObIOManager::refresh_tenant_io_config(const uint64_t tenant_id, const ObTenantIOConfig &tenant_io_config)
{
int ret = OB_SUCCESS;
@ -588,27 +513,22 @@ int ObIOManager::modify_group_io_config(const uint64_t tenant_id,
int ObIOManager::get_tenant_io_manager(const uint64_t tenant_id, ObRefHolder<ObTenantIOManager> &tenant_holder)
{
int ret = OB_SUCCESS;
ObTenantIOManager *tenant_io_mgr = nullptr;
DRWLock::RDLockGuard guard(tenant_map_lock_);
if (OB_FAIL(tenant_map_.get_refactored(tenant_id, tenant_io_mgr))) {
LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id));
} else {
if (OB_SERVER_TENANT_ID == tenant_id) {
tenant_holder.hold(server_io_manager_);
} else if (is_virtual_tenant_id(tenant_id)) {
// DO NOT SUPPORT
} else if (MTL_ID() == tenant_id) {
ObTenantIOManager *tenant_io_mgr = MTL(ObTenantIOManager*);
tenant_holder.hold(tenant_io_mgr);
}
return ret;
}
int ObIOManager::get_tenant_ids(ObIArray<uint64_t> &tenant_ids)
{
int ret = OB_SUCCESS;
tenant_ids.reset();
DRWLock::RDLockGuard guard(tenant_map_lock_);
hash::ObHashMap<uint64_t, ObTenantIOManager *>::iterator iter = tenant_map_.begin();
for (; OB_SUCC(ret) && iter != tenant_map_.end(); ++iter) {
if (OB_FAIL(tenant_ids.push_back(iter->first))) {
LOG_WARN("push back tenant id failed", K(ret), K(iter->first));
} else {
MTL_SWITCH(tenant_id) {
ObTenantIOManager *tenant_io_mgr = MTL(ObTenantIOManager*);
tenant_holder.hold(tenant_io_mgr);
}
}
if (OB_SUCC(ret) && OB_ISNULL(tenant_holder.get_ptr())) {
ret = OB_HASH_NOT_EXIST; // for compatibility
}
return ret;
}
@ -618,27 +538,38 @@ ObIOScheduler *ObIOManager::get_scheduler()
}
/****************** TenantIOManager **********************/
int ObTenantIOManager::mtl_new(ObTenantIOManager *&io_service)
{
int ret = OB_SUCCESS;
void *buf = nullptr;
io_service = nullptr;
if (is_virtual_tenant_id(MTL_ID())) {
// do nothing
} else if (OB_ISNULL(buf = OB_IO_MANAGER.allocator_.alloc(sizeof(ObTenantIOManager)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
FLOG_WARN("failed to alloc tenant io mgr", K(ret));
} else {
io_service = new (buf) ObTenantIOManager();
}
return ret;
}
int ObTenantIOManager::mtl_init(ObTenantIOManager *&io_service)
{
int ret = OB_SUCCESS;
io_service = nullptr;
const uint64_t tenant_id = MTL_ID();
ObRefHolder<ObTenantIOManager> holder;
if (OB_FAIL(OB_IO_MANAGER.add_tenant_io_manager(tenant_id, ObTenantIOConfig::default_instance()))) {
if (OB_HASH_EXIST != ret) {
LOG_WARN("add tenant io manager failed", K(ret), K(tenant_id));
if (OB_ISNULL(io_service)) {
if (is_virtual_tenant_id(tenant_id)) {
// do nothing
} else {
ret = OB_SUCCESS;
ret = OB_INVALID_ARGUMENT;
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(tenant_id, holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(tenant_id));
} else {
io_service = holder.get_ptr();
}
}
if (OB_SUCC(ret)) {
} else if (OB_FAIL(io_service->init(tenant_id,
ObTenantIOConfig::default_instance(),
&OB_IO_MANAGER.io_scheduler_))) {
FLOG_WARN("mtl iit tenant io manager failed", K(tenant_id));
} else {
FLOG_INFO("mtl init tenant io manager success", K(tenant_id), KPC(io_service));
}
return ret;
@ -647,36 +578,11 @@ int ObTenantIOManager::mtl_init(ObTenantIOManager *&io_service)
void ObTenantIOManager::mtl_destroy(ObTenantIOManager *&io_service)
{
int ret = OB_SUCCESS;
const int64_t start_ts = ObTimeUtility::current_time();
while (OB_NOT_NULL(io_service) && OB_SUCC(ret)) {
if (io_service->get_ref_cnt() == 1) {
break;
} else {
if (REACH_TIME_INTERVAL(1000L * 1000L)) { //1s
LOG_INFO("wait tenant io manager quit", K(MTL_ID()), K(start_ts), K(io_service->get_ref_cnt()));
}
ob_usleep((useconds_t)10L * 1000L); //10ms
}
}
const uint64_t tenant_id = MTL_ID();
if (OB_FAIL(OB_IO_MANAGER.remove_tenant_io_manager(tenant_id))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("remove tenant io manager failed", K(ret), K(tenant_id));
} else {
ret = OB_SUCCESS;
}
}
if (OB_SUCC(ret)) {
if (OB_NOT_NULL(io_service) && io_service->get_ref_cnt() == 0) {
io_service->~ObTenantIOManager();
OB_IO_MANAGER.allocator_.free(io_service);
io_service = nullptr;
FLOG_INFO("mtl destroy tenant io manager success", K(tenant_id));
} else if (OB_NOT_NULL(io_service) && io_service->get_ref_cnt() != 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("ERROR: tenant io manager ref_cnt is not zero", K(ret));
}
if (OB_NOT_NULL(io_service)) {
io_service->~ObTenantIOManager();
OB_IO_MANAGER.allocator_.free(io_service);
io_service = nullptr;
FLOG_INFO("mtl destroy tenant io manager success", K(MTL_ID()));
}
}
@ -740,6 +646,7 @@ int ObTenantIOManager::init(const uint64_t tenant_id,
} else {
tenant_id_ = tenant_id;
io_scheduler_ = io_scheduler;
inc_ref();
is_inited_ = true;
}
if (OB_UNLIKELY(!is_inited_)) {
@ -750,7 +657,21 @@ int ObTenantIOManager::init(const uint64_t tenant_id,
void ObTenantIOManager::destroy()
{
ATOMIC_SET(&is_working_, false);
ATOMIC_STORE(&is_working_, false);
const int64_t start_ts = ObTimeUtility::current_time();
while (1 != get_ref_cnt()) {
if (REACH_TIME_INTERVAL(1000L * 1000L)) { //1s
LOG_INFO("wait tenant io manager quit", K(MTL_ID()), K(start_ts), K(get_ref_cnt()));
}
ob_usleep((useconds_t)10L * 1000L); //10ms
}
dec_ref();
int ret = OB_SUCCESS;
if (OB_NOT_NULL(io_scheduler_) && OB_FAIL(io_scheduler_->remove_phyqueues(MTL_ID()))) {
LOG_WARN("remove phy_queues from map failed", K(ret), K(MTL_ID()));
}
if (OB_NOT_NULL(io_clock_)) {
io_clock_->destroy();
@ -785,13 +706,17 @@ int ObTenantIOManager::start()
LOG_WARN("init callback manager failed", K(ret), K(tenant_id_), K(callback_thread_count));
} else {
is_working_ = true;
int tmp_ret = OB_IO_MANAGER.adjust_tenant_clock();
if (OB_UNLIKELY(OB_SUCCESS != tmp_ret)) {
LOG_WARN("adjust tenant clock failed", K(tmp_ret));
}
}
return ret;
}
void ObTenantIOManager::stop()
{
ATOMIC_SET(&is_working_, false);
ATOMIC_STORE(&is_working_, false);
callback_mgr_.destroy();
}
@ -1672,7 +1597,7 @@ void ObTenantIOManager::dec_ref()
int64_t tmp_ref = ATOMIC_SAF(&ref_cnt_, 1);
if (tmp_ref < 0) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("bug: ref_cnt < 0", K(ret), K(tmp_ref), KCSTRING(lbt()));
LOG_ERROR("bug: ref_cnt < 0", K(ret), K(tmp_ref));
abort();
}
}

View File

@ -68,11 +68,8 @@ public:
int get_device_channel(const ObIODevice *device_handle, ObDeviceChannel *&device_channel);
// tenant management
int add_tenant_io_manager(const uint64_t tenant_id, const ObTenantIOConfig &tenant_io_config);
int remove_tenant_io_manager(const uint64_t tenant_id);
int refresh_tenant_io_config(const uint64_t tenant_id, const ObTenantIOConfig &tenant_io_config);
int get_tenant_io_manager(const uint64_t tenant_id, ObRefHolder<ObTenantIOManager> &tenant_holder);
int get_tenant_ids(ObIArray<uint64_t> &tenant_ids);
int modify_group_io_config(const uint64_t tenant_id,
const uint64_t index,
const int64_t min_percent,
@ -98,13 +95,13 @@ private:
hash::ObHashMap<int64_t /*device_handle*/, ObDeviceChannel *> channel_map_;
ObIOFaultDetector fault_detector_;
ObIOScheduler io_scheduler_;
DRWLock tenant_map_lock_;
hash::ObHashMap<uint64_t /*tenant_id*/, ObTenantIOManager *> tenant_map_;
ObTenantIOManager *server_io_manager_;
};
class ObTenantIOManager final
{
public:
static int mtl_new(ObTenantIOManager *&io_service);
static int mtl_init(ObTenantIOManager *&io_service);
static void mtl_destroy(ObTenantIOManager *&io_service);
public:

View File

@ -772,13 +772,12 @@ int ObIOTuner::init()
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
LOG_WARN("init twice", K(ret));
} else if (OB_FAIL(TG_SET_RUNNABLE_AND_START(lib::TGDefIDs::IO_TUNING, *this))) {
LOG_WARN("start io scheduler failed", K(ret));
} else {
is_inited_ = true;
if (OB_FAIL(TG_SET_RUNNABLE_AND_START(lib::TGDefIDs::IO_TUNING, *this))) {
LOG_WARN("start io scheduler failed", K(ret));
}
}
if (OB_UNLIKELY(!is_inited_)) {
if (OB_FAIL(ret)) {
destroy();
}
return ret;
@ -862,14 +861,17 @@ void ObIOTuner::print_sender_status()
int ObIOTuner::try_release_thread()
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) {
LOG_WARN("get tenant id failed", K(ret));
} else if (tenant_ids.count() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
ObVector<uint64_t> tenant_ids;
if (OB_NOT_NULL(GCTX.omt_)) {
GCTX.omt_->get_tenant_ids(tenant_ids);
}
if (tenant_ids.size() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.size(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (is_virtual_tenant_id(cur_tenant_id)) {
// do nothing
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id));
} else {
tenant_holder.get_ptr()->get_callback_mgr().try_release_thread();
@ -882,14 +884,17 @@ int ObIOTuner::try_release_thread()
void ObIOTuner::print_io_status()
{
int ret = OB_SUCCESS;
ObArray<uint64_t> tenant_ids;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_ids(tenant_ids))) {
LOG_WARN("get tenant id failed", K(ret));
} else if (tenant_ids.count() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.count(); ++i) {
ObVector<uint64_t> tenant_ids;
if (OB_NOT_NULL(GCTX.omt_)) {
GCTX.omt_->get_tenant_ids(tenant_ids);
}
if (tenant_ids.size() > 0) {
for (int64_t i = 0; OB_SUCC(ret) && i < tenant_ids.size(); ++i) {
const uint64_t cur_tenant_id = tenant_ids.at(i);
ObRefHolder<ObTenantIOManager> tenant_holder;
if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (is_virtual_tenant_id(cur_tenant_id)) {
// do nothing
} else if (OB_FAIL(OB_IO_MANAGER.get_tenant_io_manager(cur_tenant_id, tenant_holder))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("get tenant io manager failed", K(ret), K(cur_tenant_id));
} else {
@ -1042,7 +1047,7 @@ public:
LOG_WARN("get tenant io manager failed", K(ret), K(entry.first));
} else {
entry.second->~ObIOGroupQueues();
if (OB_NOT_NULL(tenant_holder.get_ptr()->get_tenant_io_allocator())) {
if (OB_NOT_NULL(tenant_holder.get_ptr()) && OB_NOT_NULL(tenant_holder.get_ptr()->get_tenant_io_allocator())) {
tenant_holder.get_ptr()->get_tenant_io_allocator()->free(entry.second);
}
}

View File

@ -128,8 +128,6 @@ int ObAdminExecutor::prepare_io()
} else if (OB_FAIL(ObIOManager::get_instance().add_device_channel(THE_IO_DEVICE,
async_io_thread_count, sync_io_thread_count, max_io_depth))) {
LOG_WARN("add device channel failed", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, tenant_io_config))) {
LOG_WARN("add server tenant io manager failed", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().start())) {
LOG_WARN("fail to start io manager", K(ret));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.init(THE_IO_DEVICE, storage_env_.default_block_size_))) {

View File

@ -165,10 +165,16 @@ public:
blocksstable::TestDataFilePrepare::SetUp();
ret = blocksstable::ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(tenant_id_);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
if (!is_server_tenant(tenant_id_)) {
static ObTenantBase tenant_ctx(tenant_id_);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
cell_cnt_ = COLS;
init_exprs();

View File

@ -71,10 +71,16 @@ public:
blocksstable::TestDataFilePrepare::SetUp();
ret = blocksstable::ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(tenant_id_);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
if (!is_server_tenant(tenant_id_)) {
static ObTenantBase tenant_ctx(tenant_id_);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
row_.count_ = COLS;
row_.cells_ = cells_;

View File

@ -167,8 +167,6 @@ int TestOpEngine::prepare_io(const string & test_data_name_suffix)
LOG_WARN("add device channel failed", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().start())) {
LOG_WARN("fail to start io manager", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SYS_TENANT_ID, io_config))) {
LOG_WARN("add tenant io config failed", K(ret));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.init(THE_IO_DEVICE, storage_env_.default_block_size_))) {
STORAGE_LOG(WARN, "init block manager fail", K(ret));
} else if (OB_FAIL(FileDirectoryUtils::create_full_path(file_dir))) {

View File

@ -119,10 +119,14 @@ void TestBackupCtx::SetUp()
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ret = ObTmpFileManager::get_instance().init();
EXPECT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
inner_init_();
}

View File

@ -327,10 +327,14 @@ void TestBackupIndexMerger::SetUp()
} else {
ASSERT_EQ(OB_SUCCESS, ret);
}
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
inner_init_();
}

View File

@ -116,10 +116,14 @@ void TestBackupIndexIterator::SetUp()
} else {
ASSERT_EQ(OB_SUCCESS, ret);
}
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
ASSERT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
inner_init_();
}

View File

@ -68,10 +68,14 @@ void TestBackupTmpFile::SetUp()
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ret = ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestBackupTmpFile::TearDown()

View File

@ -316,7 +316,11 @@ void TestBackupExternalSort::SetUp()
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestBackupExternalSort::TearDown()

View File

@ -294,8 +294,6 @@ int TestDataFilePrepareUtil::open()
STORAGE_LOG(WARN, "Fail to init OB_STORE_CACHE, ", K(ret), K(storage_env_.data_dir_));
} else if (OB_FAIL(ObIOManager::get_instance().start())) {
STORAGE_LOG(WARN, "Fail to star io mgr", K(ret));
} else if (OB_FAIL(ObIOManager::get_instance().add_tenant_io_manager(OB_SERVER_TENANT_ID, io_config))) {
STORAGE_LOG(WARN, "add tenant io config failed", K(ret));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.start(0/*reserver_size*/))) {
STORAGE_LOG(WARN, "Fail to start server block mgr", K(ret));
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.first_mark_device())) {

View File

@ -237,9 +237,6 @@ void ObMultiVersionSSTableTest::SetUpTestCase()
ObServerCheckpointSlogHandler::get_instance().is_started_ = true;
//OK(init_io_device("multi_version_test"));
ObIOManager::get_instance().add_tenant_io_manager(
tenant_id_, ObTenantIOConfig::default_instance());
// create ls
ObLSHandle ls_handle;
ret = TestDmlCommon::create_ls(tenant_id_, ObLSID(ls_id_), ls_handle);

View File

@ -83,10 +83,14 @@ void TestBloomFilterDataReaderWriter::SetUp()
ret = row_generate_.init(table_schema_);
ASSERT_EQ(OB_SUCCESS, ret);
ASSERT_EQ(OB_SUCCESS, row_.init(allocator_, TEST_COLUMN_CNT));
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestBloomFilterDataReaderWriter::TearDown()

View File

@ -627,10 +627,14 @@ void TestTmpFile::SetUp()
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ret = ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestTmpFile::TearDown()

View File

@ -186,10 +186,14 @@ void TestCompactChunk::SetUp()
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ret = ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestCompactChunk::TearDown()

View File

@ -169,10 +169,14 @@ void TestDataBlockWriter::SetUp()
ret = ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestDataBlockWriter::TearDown()

View File

@ -136,10 +136,14 @@ void TestIndexBlockWriter::SetUp()
CHUNK_MGR.set_limit(8L * 1024L * 1024L * 1024L);
ret = ObTmpFileManager::get_instance().init();
ASSERT_EQ(OB_SUCCESS, ret);
static ObTenantBase tenant_ctx(1);
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestIndexBlockWriter::TearDown()

View File

@ -57,7 +57,7 @@ class TestStorageLogRW : public TestDataFilePrepare
public:
TestStorageLogRW()
: TestDataFilePrepare(&getter, "TestStorageLogRW"),
tenant_base1_(1),
tenant_base1_(OB_SERVER_TENANT_ID),
slogger_(nullptr)
{
}
@ -100,14 +100,12 @@ void TestStorageLogRW::SetUp()
SLOGGERMGR.init(dir_, MAX_FILE_SIZE, log_file_spec_);
ObStorageLogger *tmp_slogger = OB_NEW(ObStorageLogger, ObModIds::TEST);
ASSERT_EQ(OB_SUCCESS, tmp_slogger->init(SLOGGERMGR, 1));
ASSERT_EQ(OB_SUCCESS, tmp_slogger->init(SLOGGERMGR, OB_SERVER_TENANT_ID));
ASSERT_EQ(OB_SUCCESS, tmp_slogger->start());
tenant_base1_.set(tmp_slogger);
ObTenantEnv::set_tenant(&tenant_base1_);
ASSERT_EQ(OB_SUCCESS, tenant_base1_.init());
EXPECT_EQ(OB_SUCCESS, ObIOManager::get_instance().add_tenant_io_manager(
tenant_base1_.id(), ObTenantIOConfig::default_instance()));
slogger_ = MTL(ObStorageLogger*);
slogger_->start_log(start_cursor_);

View File

@ -73,10 +73,6 @@ void TestStorageLogReplay::SetUp()
TestDataFilePrepare::SetUp();
FileDirectoryUtils::create_full_path("./test_storage_log_replay");
SLOGGERMGR.init(dir_, MAX_FILE_SIZE, log_file_spec_);
static ObTenantBase tenant_ctx(10);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
}
void TestStorageLogReplay::TearDown()

View File

@ -20,6 +20,7 @@
#include "share/io/ob_io_calibration.h"
#include "share/io/io_schedule/ob_io_mclock.h"
#include "share/resource_manager/ob_cgroup_ctrl.h"
#include "mittest/mtlenv/mock_tenant_module_env.h"
#undef private
#include "share/ob_local_device.h"
#include "lib/thread/thread_pool.h"
@ -38,13 +39,12 @@ using namespace oceanbase::common;
using namespace oceanbase::share;
using namespace oceanbase::blocksstable;
#define TEST_ROOT_DIR "io_test"
#define TEST_ROOT_DIR "./"
#define TEST_DATA_DIR TEST_ROOT_DIR "/data_dir"
#define TEST_SSTABLE_DIR TEST_DATA_DIR "/sstable"
static const int64_t IO_MEMORY_LIMIT = 10L * 1024L * 1024L * 1024L;
static const uint64_t TEST_TENANT_ID = 1001;
static const uint64_t TEST_TENANT_ID = 1;
int init_device(const int64_t media_id, ObLocalDevice &device)
{
@ -54,8 +54,8 @@ int init_device(const int64_t media_id, ObLocalDevice &device)
const int64_t data_disk_size = 1024L * 1024L * 1024L; // 1GB
const int64_t data_disk_percentage = 50L;
ObIODOpt io_opts[IO_OPT_COUNT];
io_opts[0].key_ = "data_dir"; io_opts[0].value_.value_str = TEST_DATA_DIR;
io_opts[1].key_ = "sstable_dir"; io_opts[1].value_.value_str = TEST_SSTABLE_DIR;
io_opts[0].key_ = "data_dir"; io_opts[0].value_.value_str = oceanbase::MockTenantModuleEnv::get_instance().storage_env_.data_dir_;
io_opts[1].key_ = "sstable_dir"; io_opts[1].value_.value_str = oceanbase::MockTenantModuleEnv::get_instance().storage_env_.sstable_dir_;
io_opts[2].key_ = "block_size"; io_opts[2].value_.value_int64 = block_size;
io_opts[3].key_ = "datafile_disk_percentage"; io_opts[3].value_.value_int64 = data_disk_percentage;
io_opts[4].key_ = "datafile_size"; io_opts[4].value_.value_int64 = data_disk_size;
@ -80,7 +80,6 @@ int init_device(const int64_t media_id, ObLocalDevice &device)
return ret;
}
class TestIOStruct : public ::testing::Test
{
public:
@ -90,17 +89,14 @@ static void SetUpTestCase()
system("mkdir -p " TEST_DATA_DIR);
system("mkdir -p " TEST_SSTABLE_DIR);
ASSERT_SUCC(oceanbase::MockTenantModuleEnv::get_instance().init());
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1002);
// init io device
static oceanbase::share::ObLocalDevice local_device;
ASSERT_SUCC(init_device(0, local_device));
THE_IO_DEVICE = &local_device;
ObMallocAllocator::get_instance()->get_tenant_ctx_allocator(OB_SERVER_TENANT_ID, 0)->set_limit(IO_MEMORY_LIMIT);
}
static void TearDownTestCase()
{
THE_IO_DEVICE->destroy();
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1002);
}
@ -302,10 +298,9 @@ TEST_F(TestIOStruct, IOAllocator)
TEST_F(TestIOStruct, IORequest)
{
ObTenantIOManager tenant_io_mgr;
tenant_io_mgr.inc_ref();
ASSERT_SUCC(tenant_io_mgr.io_allocator_.init(TEST_TENANT_ID, IO_MEMORY_LIMIT));
ObRefHolder<ObTenantIOManager> holder(&tenant_io_mgr);
ObRefHolder<ObTenantIOManager> holder;
OB_IO_MANAGER.get_tenant_io_manager(OB_SERVER_TENANT_ID, holder);
ObTenantIOManager &tenant_io_mgr = *(holder.get_ptr());
ObIOFd fd;
fd.first_id_ = 0;
fd.second_id_ = 1;
@ -544,10 +539,7 @@ TEST_F(TestIOStruct, IOScheduler)
ObIOAllocator io_allocator;
ASSERT_SUCC(io_allocator.init(TEST_TENANT_ID, IO_MEMORY_LIMIT));
ASSERT_TRUE(io_config.is_valid());
ObIOScheduler scheduler(io_config, io_allocator);
ASSERT_FALSE(scheduler.is_inited_);
ASSERT_SUCC(scheduler.init(2));
ASSERT_TRUE(scheduler.is_inited_);
ObIOScheduler &scheduler = *(OB_IO_MANAGER.get_scheduler());
// test schedule
ObIOResult result;
@ -654,17 +646,15 @@ TEST_F(TestIOStruct, Test_Size)
TEST_F(TestIOStruct, IOResult)
{
ObTenantIOManager tenant_io_mgr;
tenant_io_mgr.inc_ref();
ASSERT_SUCC(tenant_io_mgr.io_allocator_.init(TEST_TENANT_ID, IO_MEMORY_LIMIT));
ObRefHolder<ObTenantIOManager> holder(&tenant_io_mgr);
ObRefHolder<ObTenantIOManager> holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(TEST_TENANT_ID, holder));
ObIOFd fd;
fd.first_id_ = 0;
fd.second_id_ = 1;
void *result_buf = tenant_io_mgr.io_allocator_.alloc(sizeof(ObIOResult));
void *result_buf = holder.get_ptr()->io_allocator_.alloc(sizeof(ObIOResult));
ObIOResult *result = new (result_buf) ObIOResult;
void *req_buf = tenant_io_mgr.io_allocator_.alloc(sizeof(ObIORequest));
void *req_buf = holder.get_ptr()->io_allocator_.alloc(sizeof(ObIORequest));
ObIORequest *req = new (req_buf) ObIORequest;
@ -677,8 +667,8 @@ TEST_F(TestIOStruct, IOResult)
// prepare test read request
req->destroy();
result->destroy();
req->tenant_io_mgr_.hold(&tenant_io_mgr);
result->tenant_io_mgr_.hold(&tenant_io_mgr);
req->tenant_io_mgr_.hold(holder.get_ptr());
result->tenant_io_mgr_.hold(holder.get_ptr());
result->inc_ref();
req->inc_ref();
@ -763,12 +753,8 @@ TEST_F(TestIOStruct, IOCallbackManager)
TEST_F(TestIOStruct, IOFaultDetector)
{
// test init
ObIOConfig io_config = ObIOConfig::default_config();
ObIOFaultDetector detector(io_config);
ASSERT_FALSE(detector.is_inited_);
ASSERT_SUCC(detector.init());
ASSERT_TRUE(detector.is_inited_);
ObIOFaultDetector &detector = OB_IO_MANAGER.get_device_health_detector();
ObIOConfig &io_config = (ObIOConfig &)detector.io_config_;
// test get device health
ObDeviceHealthStatus dhs = DEVICE_HEALTH_NORMAL;
@ -777,10 +763,6 @@ TEST_F(TestIOStruct, IOFaultDetector)
ASSERT_TRUE(DEVICE_HEALTH_NORMAL == dhs);
ASSERT_TRUE(0 == disk_abnormal_time);
// test start
ASSERT_SUCC(detector.start());
ObIOManager::get_instance().is_working_ = true;
// test read failure detection
ObIOInfo io_info = get_random_io_info();
ObIOResult result;
@ -827,35 +809,27 @@ TEST_F(TestIOStruct, IOFaultDetector)
ASSERT_SUCC(detector.get_device_health_status(dhs, disk_abnormal_time));
ASSERT_TRUE(DEVICE_HEALTH_ERROR == dhs);
ASSERT_TRUE(disk_abnormal_time > 0);
// test destroy
detector.destroy();
ASSERT_FALSE(detector.is_inited_);
}
TEST_F(TestIOStruct, IOManager)
{
ObIOManager io_mgr;
ASSERT_FALSE(io_mgr.is_inited_);
ASSERT_SUCC(io_mgr.init());
ASSERT_TRUE(io_mgr.is_inited_);
ASSERT_SUCC(io_mgr.start());
io_mgr.stop();
io_mgr.destroy();
ASSERT_FALSE(io_mgr.is_inited_);
}
class TestIOManager : public TestIOStruct
{
// basic use resource manager
public:
static void SetUpTestCase()
{
}
static void TearDownTestCase()
{
oceanbase::MockTenantModuleEnv::get_instance().destroy();
}
virtual void SetUp()
{
ObIOManager::get_instance().destroy();
OB_IO_MANAGER.destroy();
const int64_t memory_limit = 10L * 1024L * 1024L * 1024L; // 10GB
ASSERT_SUCC(ObIOManager::get_instance().init(memory_limit));
ASSERT_SUCC(ObIOManager::get_instance().start());
ASSERT_SUCC(OB_IO_MANAGER.init(memory_limit));
ASSERT_SUCC(OB_IO_MANAGER.start());
// add io device
ASSERT_SUCC(OB_IO_MANAGER.add_device_channel(THE_IO_DEVICE, 16, 2, 1024));
@ -871,20 +845,18 @@ public:
io_config.other_group_config_.min_percent_ = 100;
io_config.other_group_config_.max_percent_ = 100;
io_config.other_group_config_.weight_percent_ = 100;
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(tenant_id, io_config));
}
virtual void TearDown()
{
ObIOManager::get_instance().stop();
ObIOManager::get_instance().destroy();
OB_IO_MANAGER.stop();
OB_IO_MANAGER.destroy();
}
};
TEST_F(TestIOManager, memory_pool)
{
ObIOManager &io_mgr = ObIOManager::get_instance();
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(io_mgr.get_tenant_io_manager(500, tenant_holder));
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(500, tenant_holder));
ASSERT_NE(nullptr, tenant_holder.get_ptr());
ObIORequest *io_request = nullptr;
@ -893,6 +865,7 @@ TEST_F(TestIOManager, memory_pool)
io_request->tenant_io_mgr_.hold(tenant_holder.get_ptr());
ASSERT_TRUE(tenant_holder.get_ptr()->io_request_pool_.contain(io_request));
ASSERT_SUCC(tenant_holder.get_ptr()->io_request_pool_.recycle(io_request));
io_request->tenant_io_mgr_.reset();
ObIOResult *io_result = nullptr;
ASSERT_SUCC(tenant_holder.get_ptr()->io_result_pool_.alloc(io_result));
@ -900,12 +873,14 @@ TEST_F(TestIOManager, memory_pool)
io_result->tenant_io_mgr_.hold(tenant_holder.get_ptr());
ASSERT_TRUE(tenant_holder.get_ptr()->io_result_pool_.contain(io_result));
ASSERT_SUCC(tenant_holder.get_ptr()->io_result_pool_.recycle(io_result));
io_result->tenant_io_mgr_.reset();
void *result_buf = tenant_holder.get_ptr()->io_allocator_.alloc(sizeof(ObIOResult));
ObIOResult *result1 = new (result_buf) ObIOResult;
result1->tenant_io_mgr_.hold(tenant_holder.get_ptr());
ASSERT_FALSE(tenant_holder.get_ptr()->io_result_pool_.contain(result1));
ASSERT_FAIL(tenant_holder.get_ptr()->io_result_pool_.recycle(result1));
result1->~ObIOResult();
tenant_holder.get_ptr()->io_allocator_.free(result1);
void *req_buf = tenant_holder.get_ptr()->io_allocator_.alloc(sizeof(ObIORequest));
@ -913,6 +888,7 @@ TEST_F(TestIOManager, memory_pool)
req1->tenant_io_mgr_.hold(tenant_holder.get_ptr());
ASSERT_FALSE(tenant_holder.get_ptr()->io_request_pool_.contain(req1));
ASSERT_FAIL(tenant_holder.get_ptr()->io_request_pool_.recycle(req1));
req1->~ObIORequest();
tenant_holder.get_ptr()->io_allocator_.free(req1);
}
@ -1277,8 +1253,6 @@ TEST_F(TestIOManager, tenant)
{
ObTenantIOConfig default_config = ObTenantIOConfig::default_instance();
default_config.unit_config_.max_iops_ = 20000L;
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(1001, default_config));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(1002, default_config));
int64_t current_ts = ObTimeUtility::fast_current_time();
IOPerfLoad load;
load.group_id_ = 0;
@ -1303,8 +1277,6 @@ TEST_F(TestIOManager, tenant)
IOPerfRunner runner;
ASSERT_SUCC(runner.init(current_ts, load));
usleep(2L * 1000L * 1000L); // 2s
ASSERT_SUCC(OB_IO_MANAGER.remove_tenant_io_manager(1002));
ASSERT_SUCC(OB_IO_MANAGER.remove_tenant_io_manager(1001));
runner.wait();
runner.destroy();
}
@ -1348,7 +1320,6 @@ TEST_F(TestIOManager, perf)
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
LOG_INFO("wenqu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1422,7 +1393,6 @@ TEST_F(TestIOManager, alloc_memory)
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
curr_config.config_.memory_limit_ = 16L* 1024L * 1024L; //16MB
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1493,7 +1463,6 @@ TEST_F(TestIOManager, IOTracer)
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
LOG_INFO("wenqu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1580,7 +1549,6 @@ TEST_F(TestIOManager, ModifyIOPS)
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
LOG_INFO("wenqu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1667,7 +1635,6 @@ TEST_F(TestIOManager, ModifyCallbackThread)
for (int64_t i = 0; i < perf_tenants.count(); ++i) {
IOPerfTenant &curr_config = perf_tenants.at(i);
LOG_INFO("wenqu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1752,7 +1719,6 @@ TEST_F(TestIOManager, ModifyGroupIO)
IOPerfTenant &curr_config = perf_tenants.at(i);
if (curr_config.tenant_id_ == 1002) {
LOG_INFO("qilu: tenant config", K(curr_config), K(i));
ASSERT_SUCC(OB_IO_MANAGER.add_tenant_io_manager(curr_config.tenant_id_, curr_config.config_));
ObRefHolder<ObTenantIOManager> tenant_holder;
ASSERT_SUCC(OB_IO_MANAGER.get_tenant_io_manager(curr_config.tenant_id_, tenant_holder));
ASSERT_SUCC(tenant_holder.get_ptr()->refresh_group_io_config());
@ -1892,15 +1858,15 @@ void write_group_perf_config()
"1 0 8 1 64 1 ./perf_test\n"
"\n"
"tenant_id min_iops max_iops weight group\n"
"1001 5000 100000 700 10001: testgroup1: 80, 100, 60; 10002: testgroup2: 10, 60, 30; 0: OTHER_GROUPS: 10, 100, 10;\n"
"1002 1000 50000 1000 12345: testgroup1: 50, 50, 50; 0: OTHER_GROUPS: 50, 50, 50;\n"
"1 5000 100000 700 10001: testgroup1: 80, 100, 60; 10002: testgroup2: 10, 60, 30; 0: OTHER_GROUPS: 10, 100, 10;\n"
"500 1000 50000 1000 12345: testgroup1: 50, 50, 50; 0: OTHER_GROUPS: 50, 50, 50;\n"
"\n"
"tenant_id device_id group io_mode io_size_byte io_depth perf_mode target_iops thread_count is_sequence start_s stop_s\n"
"1001 1 0 r 16384 10 rolling 0 16 0 0 8\n"
"1001 1 10001 r 16384 10 rolling 0 16 0 2 7\n"
"1001 1 10002 r 16384 10 rolling 0 16 0 0 6\n"
"1002 1 0 r 16384 100 rolling 0 16 0 0 10\n"
"1002 1 12345 r 16384 100 rolling 0 16 0 0 10\n"
"1 1 0 r 16384 10 rolling 0 16 0 0 8\n"
"1 1 10001 r 16384 10 rolling 0 16 0 2 7\n"
"1 1 10002 r 16384 10 rolling 0 16 0 0 6\n"
"500 1 0 r 16384 100 rolling 0 16 0 0 10\n"
"500 1 12345 r 16384 100 rolling 0 16 0 0 10\n"
;
const int64_t file_len = strlen(file_buf);
int write_ret = ::write(fd, file_buf, file_len);

View File

@ -190,7 +190,11 @@ void TestParallelExternalSort::SetUp()
static ObTenantBase tenant_ctx(OB_SYS_TENANT_ID);
ObTenantEnv::set_tenant(&tenant_ctx);
ObTenantIOManager *io_service = nullptr;
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_new(io_service));
EXPECT_EQ(OB_SUCCESS, ObTenantIOManager::mtl_init(io_service));
EXPECT_EQ(OB_SUCCESS, io_service->start());
tenant_ctx.set(io_service);
ObTenantEnv::set_tenant(&tenant_ctx);
}
void TestParallelExternalSort::TearDown()