diff --git a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp index 0424701bd..3411b9633 100644 --- a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp +++ b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp @@ -63,7 +63,6 @@ public: static void TearDownTestCase(); }; static ObSimpleMemLimitGetter getter; -static const int64_t TEST_ROWKEY_COLUMN_CNT = 2; // ATTENTION! // currently, we only initialize modules about tmp file at the beginning of unit test and @@ -268,7 +267,6 @@ TEST_F(TestTmpFile, test_read) ret = MTL(ObTenantTmpFileManager *)->get_sn_file_manager().get_tmp_file(fd, file_handle); ASSERT_EQ(OB_SUCCESS, ret); file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY; - file_handle.reset(); ObTmpFileIOInfo io_info; io_info.fd_ = fd; @@ -282,11 +280,8 @@ TEST_F(TestTmpFile, test_read) write_time = ObTimeUtility::current_time() - write_time; ASSERT_EQ(OB_SUCCESS, ret); - ret = MTL(ObTenantTmpFileManager *)->get_sn_file_manager().get_tmp_file(fd, file_handle); - ASSERT_EQ(OB_SUCCESS, ret); int64_t wbp_begin_offset = file_handle.get()->cal_wbp_begin_offset(); ASSERT_GT(wbp_begin_offset, 0); - file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY; file_handle.reset(); int64_t read_time = ObTimeUtility::current_time(); @@ -1204,7 +1199,6 @@ TEST_F(TestTmpFile, test_write_last_page_during_flush) LOG_INFO("test_write_last_page_during_flush"); } -// generate 750MB random data. // this test will trigger flush and evict logic for both data and meta pages. void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpFileIOInfo io_info) { @@ -1239,7 +1233,7 @@ void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpF io_info.io_desc_.set_wait_event(2); io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; - // 1. write 750MB data + // 1. write data io_info.buf_ = write_buf; io_info.size_ = write_size; int64_t write_time = ObTimeUtility::current_time(); @@ -1247,7 +1241,7 @@ void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpF ASSERT_EQ(OB_SUCCESS, ret); write_time = ObTimeUtility::current_time() - write_time; - // 2. read 750MB data + // 2. read data ObTmpFileIOHandle handle; int64_t read_size = write_size; char *read_buf = new char [read_size]; @@ -1619,6 +1613,9 @@ TEST_F(TestTmpFile, test_multiple_small_files) STORAGE_LOG(INFO, "io time", K(io_time)); } +// ATTENTION +// the case after this will increase wbp_mem_limit to BIG_WBP_MEM_LIMIT. +// And it will never be decreased as long as it has been increased TEST_F(TestTmpFile, test_big_file) { const int64_t write_size = 750 * 1024 * 1024; // write 750MB data @@ -1649,7 +1646,6 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache) test_big_file(write_size, wbp_mem_limit, io_info); } -// TODO, xuwei, enbale later TEST_F(TestTmpFile, test_aio_pread) { int ret = OB_SUCCESS; @@ -1733,7 +1729,7 @@ TEST_F(TestTmpFile, test_aio_pread) ret = MTL(ObTenantTmpFileManager *)->remove(fd); ASSERT_EQ(OB_SUCCESS, ret); - LOG_INFO("test_cached_read"); + LOG_INFO("test_aio_pread"); } } // namespace oceanbase diff --git a/src/observer/virtual_table/ob_all_virtual_tmp_file.cpp b/src/observer/virtual_table/ob_all_virtual_tmp_file.cpp index 00417eb87..cede22e8e 100644 --- a/src/observer/virtual_table/ob_all_virtual_tmp_file.cpp +++ b/src/observer/virtual_table/ob_all_virtual_tmp_file.cpp @@ -81,7 +81,7 @@ int ObAllVirtualTmpFileInfo::get_next_tmp_file_info_(tmp_file::ObSNTmpFileInfo & if (fd_idx_ >= fd_arr_.count()) { ret = OB_ITER_END; SERVER_LOG(INFO, "iterate current tenant reach end", K(fd_idx_), K(fd_arr_.count())); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_info(fd_arr_.at(fd_idx_), tmp_file_info))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_info(MTL_ID(), fd_arr_.at(fd_idx_), tmp_file_info))) { if (OB_ENTRY_NOT_EXIST == ret || OB_TIMEOUT == ret) { SERVER_LOG(INFO, "tmp file does not exist or is locked by others", KR(ret), K(fd_arr_.at(fd_idx_))); ret = OB_SUCCESS; @@ -111,7 +111,7 @@ int ObAllVirtualTmpFileInfo::process_curr_tenant(common::ObNewRow *&row) if (OB_UNLIKELY(!fd_arr_.empty())) { ret = OB_ERR_UNEXPECTED; SERVER_LOG(WARN, "unexpected fd_arr_", KR(ret), K(fd_arr_)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_fds(fd_arr_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_fds(MTL_ID(), fd_arr_))) { SERVER_LOG(WARN, "fail to get tmp file fd arr", KR(ret)); if (OB_NOT_INIT == ret) { ret = OB_SUCCESS; diff --git a/src/sql/dtl/ob_dtl_interm_result_manager.cpp b/src/sql/dtl/ob_dtl_interm_result_manager.cpp index fdbe21c3d..c0f5ca649 100644 --- a/src/sql/dtl/ob_dtl_interm_result_manager.cpp +++ b/src/sql/dtl/ob_dtl_interm_result_manager.cpp @@ -271,7 +271,7 @@ int ObDTLIntermResultManager::insert_interm_result_info(ObDTLIntermResultKey &ke // The code here is mainly for the use of the temp_table. // For the px module, // the dir_id has already been set in the previous access_mem_profile. - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(result_info->get_tenant_id(), dir_id_))) { LOG_WARN("allocate file directory failed", K(ret)); } else { DTL_IR_STORE_DO(*result_info, set_dir_id, dir_id_); diff --git a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp index 6928514dc..06bc78224 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_op.cpp +++ b/src/sql/engine/aggregate/ob_merge_groupby_op.cpp @@ -243,7 +243,7 @@ int ObMergeGroupByOp::init_group_rows() int ObMergeGroupByOp::init() { int ret = OB_SUCCESS; - if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) { } else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) { diff --git a/src/sql/engine/aggregate/ob_merge_groupby_vec_op.cpp b/src/sql/engine/aggregate/ob_merge_groupby_vec_op.cpp index 222ff029c..b45eea5f9 100644 --- a/src/sql/engine/aggregate/ob_merge_groupby_vec_op.cpp +++ b/src/sql/engine/aggregate/ob_merge_groupby_vec_op.cpp @@ -544,7 +544,7 @@ int ObMergeGroupByVecOp::init() int ret = OB_SUCCESS; group_rows_.set_tenant_id(MTL_ID()); group_rows_.set_ctx_id(ObCtxIds::DEFAULT_CTX_ID); - if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) { } else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) { diff --git a/src/sql/engine/aggregate/ob_scalar_aggregate_op.cpp b/src/sql/engine/aggregate/ob_scalar_aggregate_op.cpp index 11678566d..b4d2042bf 100644 --- a/src/sql/engine/aggregate/ob_scalar_aggregate_op.cpp +++ b/src/sql/engine/aggregate/ob_scalar_aggregate_op.cpp @@ -27,9 +27,10 @@ OB_SERIALIZE_MEMBER((ObScalarAggregateSpec, ObGroupBySpec), enable_hash_base_dis int ObScalarAggregateOp::inner_open() { int ret = OB_SUCCESS; + uint64_t tenant_id = ctx_.get_my_session()->get_effective_tenant_id(); if (OB_FAIL(ObGroupByOp::inner_open())) { LOG_WARN("failed to inner_open", K(ret)); - } else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + } else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) { } else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) { @@ -40,7 +41,7 @@ int ObScalarAggregateOp::inner_open() LOG_WARN("failed to init one group", K(ret)); } else { bool need_dir_id = aggr_processor_.processor_need_alloc_dir_id(); - if (need_dir_id && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (need_dir_id && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (need_dir_id && FALSE_IT(aggr_processor_.set_dir_id(dir_id_))) { } else if (FALSE_IT(aggr_processor_.set_io_event_observer(&io_event_observer_))) { diff --git a/src/sql/engine/aggregate/ob_scalar_aggregate_vec_op.cpp b/src/sql/engine/aggregate/ob_scalar_aggregate_vec_op.cpp index fd5c437bb..97a21f9f0 100644 --- a/src/sql/engine/aggregate/ob_scalar_aggregate_vec_op.cpp +++ b/src/sql/engine/aggregate/ob_scalar_aggregate_vec_op.cpp @@ -27,7 +27,7 @@ int ObScalarAggregateVecOp::inner_open() int ret = OB_SUCCESS; if (OB_FAIL(ObGroupByVecOp::inner_open())) { LOG_WARN("groupby inner open failed", K(ret)); - } else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + } else if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(ctx_.get_my_session()->get_effective_tenant_id(), dir_id_))) { LOG_WARN("failed to allocate dir id", K(ret)); } else if (OB_FAIL(init_mem_context())) { LOG_WARN("init memory context failed", K(ret)); diff --git a/src/sql/engine/basic/ob_chunk_datum_store.cpp b/src/sql/engine/basic/ob_chunk_datum_store.cpp index 803e570e0..56899dcb0 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.cpp +++ b/src/sql/engine/basic/ob_chunk_datum_store.cpp @@ -570,7 +570,7 @@ ObChunkDatumStore::ObChunkDatumStore(const ObLabel &label, common::ObIAllocator } int ObChunkDatumStore::init(int64_t mem_limit, - uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */, + uint64_t tenant_id, int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */, const char *label /* = common::ObModIds::OB_SQL_CHUNK_ROW_STORE) */, bool enable_dump /* = true */, @@ -602,7 +602,7 @@ void ObChunkDatumStore::reset() int ret = OB_SUCCESS; if (is_file_open()) { aio_write_handle_.reset(); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) { LOG_WARN("remove file failed", K(ret), K_(io_.fd)); } else { LOG_INFO("close file success", K(ret), K_(io_.fd)); @@ -2099,7 +2099,7 @@ int ObChunkDatumStore::get_timeout(int64_t &timeout_ms) int ObChunkDatumStore::alloc_dir_id() { int ret = OB_SUCCESS; - if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(io_.dir_id_))) { + if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, io_.dir_id_))) { LOG_WARN("allocate file directory failed", K(ret)); } return ret; @@ -2122,7 +2122,7 @@ int ObChunkDatumStore::write_file(void *buf, int64_t size) if (-1 == io_.dir_id_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("temp file dir id is not init", K(ret), K(io_.dir_id_)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) { LOG_WARN("open file failed", K(ret)); } else { file_size_ = 0; @@ -2137,7 +2137,7 @@ int ObChunkDatumStore::write_file(void *buf, int64_t size) set_io(size, static_cast(buf)); if (aio_write_handle_.is_valid() && OB_FAIL(aio_write_handle_.wait())) { LOG_WARN("failed to wait write", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_, aio_write_handle_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_, aio_write_handle_))) { LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms)); } } @@ -2186,9 +2186,9 @@ int ObChunkDatumStore::read_file( tmp_io.io_timeout_ms_ = timeout_ms; if (0 == read_size - && OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_io.fd_, tmp_file_size))) { + && OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(tenant_id_, tmp_io.fd_, tmp_file_size))) { LOG_WARN("failed to get tmp file size", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(tmp_io, offset, handle))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, tmp_io, offset, handle))) { if (OB_ITER_END != ret) { LOG_WARN("read form file failed", K(ret), K(tmp_io), K(offset), K(timeout_ms)); } @@ -2220,7 +2220,7 @@ int ObChunkDatumStore::aio_read_file( tmp_io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_READ); if (OB_FAIL(get_timeout(tmp_io.io_timeout_ms_))) { LOG_WARN("get timeout failed", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(tmp_io, offset, handle))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(tenant_id_, tmp_io, offset, handle))) { if (OB_ITER_END != ret) { LOG_WARN("read form file failed", K(ret), K(tmp_io), K(offset)); } diff --git a/src/sql/engine/basic/ob_chunk_datum_store.h b/src/sql/engine/basic/ob_chunk_datum_store.h index f60710bbf..274a88ffa 100644 --- a/src/sql/engine/basic/ob_chunk_datum_store.h +++ b/src/sql/engine/basic/ob_chunk_datum_store.h @@ -969,7 +969,7 @@ public: virtual ~ObChunkDatumStore() { reset(); } int init(int64_t mem_limit, - uint64_t tenant_id = common::OB_SERVER_TENANT_ID, + uint64_t tenant_id, int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID, const char *label = common::ObModIds::OB_SQL_CHUNK_ROW_STORE, bool enable_dump = true, diff --git a/src/sql/engine/basic/ob_chunk_row_store.cpp b/src/sql/engine/basic/ob_chunk_row_store.cpp index 633a672e2..466ab5c88 100644 --- a/src/sql/engine/basic/ob_chunk_row_store.cpp +++ b/src/sql/engine/basic/ob_chunk_row_store.cpp @@ -242,7 +242,7 @@ ObChunkRowStore::ObChunkRowStore(common::ObIAllocator *alloc /* = NULL */) } int ObChunkRowStore::init(int64_t mem_limit, - uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */, + uint64_t tenant_id, int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */, const char *label /* = common::ObNewModIds::OB_SQL_CHUNK_ROW_STORE) */, bool enable_dump /* = true */, @@ -278,7 +278,6 @@ int ObChunkRowStore::init(int64_t mem_limit, void ObChunkRowStore::reset() { int ret = OB_SUCCESS; - tenant_id_ = common::OB_SERVER_TENANT_ID; label_ = common::ObModIds::OB_SQL_ROW_STORE; ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID; @@ -286,13 +285,14 @@ void ObChunkRowStore::reset() row_cnt_ = 0; if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) { LOG_WARN("remove file failed", K(ret), K_(io_.fd)); } else { LOG_TRACE("close file success", K(ret), K_(io_.fd)); } io_.fd_ = -1; } + tenant_id_ = common::OB_SERVER_TENANT_ID; n_block_in_file_ = 0; while (!blocks_.is_empty()) { @@ -348,7 +348,7 @@ void ObChunkRowStore::reuse() { int ret = OB_SUCCESS; if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) { LOG_WARN("remove file failed", K(ret), K_(io_.fd)); } else { LOG_TRACE("close file success", K(ret), K_(io_.fd)); @@ -1598,7 +1598,7 @@ int ObChunkRowStore::get_timeout(int64_t &timeout_ms) int ObChunkRowStore::alloc_dir_id() { int ret = OB_SUCCESS; - if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(io_.dir_id_))) { + if (-1 == io_.dir_id_ && OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, io_.dir_id_))) { LOG_WARN("allocate file directory failed", K(ret)); } return ret; @@ -1621,7 +1621,7 @@ int ObChunkRowStore::write_file(void *buf, int64_t size) if (-1 == io_.dir_id_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("temp file dir id is not init", K(ret), K(io_.dir_id_)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) { LOG_WARN("open file failed", K(ret)); } else { file_size_ = 0; @@ -1634,7 +1634,7 @@ int ObChunkRowStore::write_file(void *buf, int64_t size) } if (OB_SUCC(ret) && size > 0) { set_io(size, static_cast(buf)); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io_))) { LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms)); } } @@ -1673,9 +1673,9 @@ int ObChunkRowStore::read_file(void *buf, const int64_t size, const int64_t offs io_.io_timeout_ms_ = timeout_ms; tmp_file::ObTmpFileIOHandle handle; if (0 == read_size - && OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(io_.fd_, tmp_file_size))) { + && OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size(tenant_id_, io_.fd_, tmp_file_size))) { LOG_WARN("failed to get tmp file size", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_, offset, handle))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io_, offset, handle))) { if (OB_ITER_END != ret) { LOG_WARN("read form file failed", K(ret), K(io_), K(offset), K(timeout_ms)); } @@ -1701,11 +1701,11 @@ bool ObChunkRowStore::need_dump(int64_t extra_size) return dump; } -int ObChunkStoreUtil::alloc_dir_id(int64_t &dir_id) +int ObChunkStoreUtil::alloc_dir_id(const uint64_t tenant_id, int64_t &dir_id) { int ret = OB_SUCCESS; dir_id = 0; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id, dir_id))) { LOG_WARN("allocate file directory failed", K(ret)); } return ret; diff --git a/src/sql/engine/basic/ob_chunk_row_store.h b/src/sql/engine/basic/ob_chunk_row_store.h index b82d2ef8f..35d22a49b 100644 --- a/src/sql/engine/basic/ob_chunk_row_store.h +++ b/src/sql/engine/basic/ob_chunk_row_store.h @@ -354,7 +354,7 @@ public: virtual ~ObChunkRowStore() { reset(); } int init(int64_t mem_limit, - uint64_t tenant_id = common::OB_SERVER_TENANT_ID, + uint64_t tenant_id, int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID, const char *label = common::ObModIds::OB_SQL_CHUNK_ROW_STORE, bool enable_dump = true, @@ -535,7 +535,7 @@ inline int ObChunkRowStore::BlockBuffer::advance(int64_t size) class ObChunkStoreUtil { public: - static int alloc_dir_id(int64_t &dir_id); + static int alloc_dir_id(const uint64_t tenant_id, int64_t &dir_id); }; } // end namespace sql diff --git a/src/sql/engine/basic/ob_ra_datum_store.cpp b/src/sql/engine/basic/ob_ra_datum_store.cpp index fe569f512..9f58c7d9c 100644 --- a/src/sql/engine/basic/ob_ra_datum_store.cpp +++ b/src/sql/engine/basic/ob_ra_datum_store.cpp @@ -358,7 +358,7 @@ ObRADatumStore::ObRADatumStore(common::ObIAllocator *alloc /* = NULL */) } int ObRADatumStore::init(int64_t mem_limit, - uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */, + uint64_t tenant_id, int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */, const char *label /* = common::ObModIds::OB_SQL_ROW_STORE) */, uint32_t row_extend_size /* = 0 */) @@ -398,7 +398,6 @@ void ObRADatumStore::inc_mem_hold(int64_t hold) void ObRADatumStore::reset() { int ret = OB_SUCCESS; - tenant_id_ = common::OB_SERVER_TENANT_ID; label_ = common::ObModIds::OB_SQL_ROW_STORE; ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID; mem_limit_ = 0; @@ -415,7 +414,7 @@ void ObRADatumStore::reset() inner_reader_.reset(); if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) { LOG_WARN("remove file failed", K(ret), K_(fd)); } else { LOG_INFO("close file success", K(ret), K_(fd)); @@ -424,6 +423,7 @@ void ObRADatumStore::reset() dir_id_ = -1; file_size_ = 0; } + tenant_id_ = common::OB_SERVER_TENANT_ID; while (!blk_mem_list_.is_empty()) { LinkNode *node = blk_mem_list_.remove_first(); @@ -445,7 +445,7 @@ void ObRADatumStore::reuse() row_cnt_ = 0; inner_reader_.reset(); if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) { LOG_WARN("remove file failed", K(ret), K_(fd)); } else { LOG_INFO("close file success", K(ret), K_(fd)); @@ -1253,9 +1253,9 @@ int ObRADatumStore::write_file(BlockIndex &bi, void *buf, int64_t size) LOG_WARN("get timeout failed", K(ret)); } else { if (!is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, dir_id_))) { LOG_WARN("alloc file directory failed", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, fd_, dir_id_))) { LOG_WARN("open file failed", K(ret)); } else { file_size_ = 0; @@ -1275,7 +1275,7 @@ int ObRADatumStore::write_file(BlockIndex &bi, void *buf, int64_t size) io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_WRITE); io.io_timeout_ms_ = timeout_ms; const uint64_t start = rdtsc(); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io))) { LOG_WARN("write to file failed", K(ret), K(io), K(timeout_ms)); } if (NULL != io_observer_) { @@ -1316,7 +1316,7 @@ int ObRADatumStore::read_file(void *buf, const int64_t size, const int64_t offse io.io_timeout_ms_ = timeout_ms; const uint64_t start = rdtsc(); tmp_file::ObTmpFileIOHandle handle; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io, offset, handle))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io, offset, handle))) { LOG_WARN("read form file failed", K(ret), K(io), K(offset), K(timeout_ms)); } else if (OB_UNLIKELY(handle.get_done_size() != size)) { ret = OB_INNER_STAT_ERROR; diff --git a/src/sql/engine/basic/ob_ra_datum_store.h b/src/sql/engine/basic/ob_ra_datum_store.h index 5839d5d47..1440219cd 100644 --- a/src/sql/engine/basic/ob_ra_datum_store.h +++ b/src/sql/engine/basic/ob_ra_datum_store.h @@ -344,7 +344,7 @@ public: virtual ~ObRADatumStore() { reset(); } int init(int64_t mem_limit, - uint64_t tenant_id = common::OB_SERVER_TENANT_ID, + uint64_t tenant_id, int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID, const char *label = common::ObModIds::OB_SQL_ROW_STORE, uint32_t row_extend_size = 0); diff --git a/src/sql/engine/basic/ob_ra_row_store.cpp b/src/sql/engine/basic/ob_ra_row_store.cpp index e6abade66..656719062 100644 --- a/src/sql/engine/basic/ob_ra_row_store.cpp +++ b/src/sql/engine/basic/ob_ra_row_store.cpp @@ -205,7 +205,7 @@ ObRARowStore::ObRARowStore(common::ObIAllocator *alloc /* = NULL */, } int ObRARowStore::init(int64_t mem_limit, - uint64_t tenant_id /* = common::OB_SERVER_TENANT_ID */, + uint64_t tenant_id, int64_t mem_ctx_id /* = common::ObCtxIds::DEFAULT_CTX_ID */, const char *label /* = common::ObNewModIds::OB_SQL_ROW_STORE) */) { @@ -226,7 +226,6 @@ int ObRARowStore::init(int64_t mem_limit, void ObRARowStore::reset() { int ret = OB_SUCCESS; - tenant_id_ = common::OB_SERVER_TENANT_ID; label_ = common::ObModIds::OB_SQL_ROW_STORE; ctx_id_ = common::ObCtxIds::DEFAULT_CTX_ID; mem_limit_ = 0; @@ -239,7 +238,7 @@ void ObRARowStore::reset() inner_reader_.reset(); if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) { LOG_WARN("remove file failed", K(ret), K_(fd)); } else { LOG_INFO("close file success", K(ret), K_(fd)); @@ -249,6 +248,7 @@ void ObRARowStore::reset() file_size_ = 0; } + tenant_id_ = common::OB_SERVER_TENANT_ID; while (!blk_mem_list_.is_empty()) { LinkNode *node = blk_mem_list_.remove_first(); if (NULL != node) { @@ -273,7 +273,7 @@ void ObRARowStore::reuse() row_cnt_ = 0; inner_reader_.reuse(); if (is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) { LOG_WARN("remove file failed", K(ret), K_(fd)); } else { LOG_INFO("close file success", K(ret), K_(fd)); @@ -950,9 +950,9 @@ int ObRARowStore::write_file(BlockIndex &bi, void *buf, int64_t size) LOG_WARN("get timeout failed", K(ret)); } else { if (!is_file_open()) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, dir_id_))) { LOG_WARN("alloc file directory failed", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, fd_, dir_id_))) { LOG_WARN("open file failed", K(ret)); } else { file_size_ = 0; @@ -968,7 +968,7 @@ int ObRARowStore::write_file(BlockIndex &bi, void *buf, int64_t size) io.size_ = size; io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_WRITE); io.io_timeout_ms_ = timeout_ms; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.write(io))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.write(tenant_id_, io))) { LOG_WARN("write to file failed", K(ret), K(io), K(timeout_ms)); } } @@ -1003,7 +1003,7 @@ int ObRARowStore::read_file(void *buf, const int64_t size, const int64_t offset) io.io_desc_.set_wait_event(ObWaitEventIds::ROW_STORE_DISK_READ); io.io_timeout_ms_ = timeout_ms; tmp_file::ObTmpFileIOHandle handle; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io, offset, handle))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, io, offset, handle))) { LOG_WARN("read form file failed", K(ret), K(io), K(offset), K(timeout_ms)); } else if (handle.get_done_size() != size) { ret = OB_INNER_STAT_ERROR; diff --git a/src/sql/engine/basic/ob_ra_row_store.h b/src/sql/engine/basic/ob_ra_row_store.h index 596acd2a5..bb9704369 100644 --- a/src/sql/engine/basic/ob_ra_row_store.h +++ b/src/sql/engine/basic/ob_ra_row_store.h @@ -219,7 +219,7 @@ public: virtual ~ObRARowStore() { reset(); } int init(int64_t mem_limit, - uint64_t tenant_id = common::OB_SERVER_TENANT_ID, + uint64_t tenant_id, int64_t mem_ctx_id = common::ObCtxIds::DEFAULT_CTX_ID, const char *label = common::ObModIds::OB_SQL_ROW_STORE); diff --git a/src/sql/engine/basic/ob_temp_block_store.cpp b/src/sql/engine/basic/ob_temp_block_store.cpp index 65047c928..a04725726 100644 --- a/src/sql/engine/basic/ob_temp_block_store.cpp +++ b/src/sql/engine/basic/ob_temp_block_store.cpp @@ -103,7 +103,7 @@ void ObTempBlockStore::reset() if (is_file_open()) { write_io_handle_.reset(); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) { LOG_WARN("remove file failed", K(ret), K_(io_.fd)); } else { LOG_INFO("close file success", K(ret), K_(io_.fd), K_(file_size)); @@ -126,7 +126,7 @@ void ObTempBlockStore::reuse() inner_reader_.reset(); if (is_file_open()) { write_io_handle_.reset(); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(io_.fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, io_.fd_))) { LOG_WARN("remove file failed", K(ret), K_(io_.fd)); } else { LOG_INFO("close file success", K(ret), K_(io_.fd), K_(file_size)); @@ -178,7 +178,7 @@ int ObTempBlockStore::alloc_dir_id() int ret = OB_SUCCESS; if (-1 == io_.dir_id_) { io_.dir_id_ = 0; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(io_.dir_id_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id_, io_.dir_id_))) { LOG_WARN("allocate file directory failed", K(ret)); } } @@ -1090,7 +1090,7 @@ int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size) if (!is_file_open()) { if (OB_FAIL(alloc_dir_id())) { LOG_WARN("alloc file directory failed", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(io_.fd_, io_.dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id_, io_.fd_, io_.dir_id_))) { LOG_WARN("open file failed", K(ret)); } else { file_size_ = 0; @@ -1107,7 +1107,7 @@ int ObTempBlockStore::write_file(BlockIndex &bi, void *buf, int64_t size) const uint64_t start = rdtsc(); if (write_io_handle_.is_valid() && OB_FAIL(write_io_handle_.wait())) { LOG_WARN("fail to wait write", K(ret), K(write_io_handle_)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_, write_io_handle_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_, write_io_handle_))) { LOG_WARN("write to file failed", K(ret), K_(io), K(timeout_ms)); } if (NULL != io_observer_) { @@ -1147,11 +1147,11 @@ int ObTempBlockStore::read_file(void *buf, const int64_t size, const int64_t off tmp_read_id.io_timeout_ms_ = timeout_ms; const uint64_t start = rdtsc(); if (is_async) { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(tmp_read_id, offset, handle))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(tenant_id_, tmp_read_id, offset, handle))) { LOG_WARN("read form file failed", K(ret), K(tmp_read_id), K(offset), K(timeout_ms)); } } else { - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(tmp_read_id, offset, handle))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(tenant_id_, tmp_read_id, offset, handle))) { LOG_WARN("read form file failed", K(ret), K(tmp_read_id), K(offset), K(timeout_ms)); } else if (OB_UNLIKELY(handle.get_done_size() != size)) { ret = OB_INNER_STAT_ERROR; @@ -1597,7 +1597,7 @@ int ObTempBlockStore::truncate_file(int64_t offset) if (!is_inited()) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.truncate(get_file_fd(), offset))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.truncate(tenant_id_, get_file_fd(), offset))) { LOG_WARN("truncate failed", K(ret), K(get_file_fd()), K(offset)); } return ret; diff --git a/src/sql/engine/ob_sql_mem_mgr_processor.cpp b/src/sql/engine/ob_sql_mem_mgr_processor.cpp index 3ba1737e7..cc763b265 100644 --- a/src/sql/engine/ob_sql_mem_mgr_processor.cpp +++ b/src/sql/engine/ob_sql_mem_mgr_processor.cpp @@ -316,7 +316,7 @@ int ObSqlMemMgrProcessor::alloc_dir_id(int64_t &dir_id) { int ret = OB_SUCCESS; if (0 == dir_id_) { - if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id_, dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } } diff --git a/src/sql/engine/window_function/ob_window_function_op.cpp b/src/sql/engine/window_function/ob_window_function_op.cpp index d24ef32d6..86b058548 100644 --- a/src/sql/engine/window_function/ob_window_function_op.cpp +++ b/src/sql/engine/window_function/ob_window_function_op.cpp @@ -1229,7 +1229,7 @@ int ObWindowFunctionOp::init() func_alloc.local_allocator_ = &local_allocator_; int64_t prev_pushdown_pby_col_count = -1; WFInfoFixedArray &wf_infos = *const_cast(&MY_SPEC.wf_infos_); - if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (OB_FAIL(curr_row_collect_values_.prepare_allocate(wf_infos.count()))) { LOG_WARN("cur row collect values prepare allocate failed", K(ret)); diff --git a/src/sql/engine/window_function/ob_window_function_vec_op.cpp b/src/sql/engine/window_function/ob_window_function_vec_op.cpp index abd9f3870..5f09ed243 100644 --- a/src/sql/engine/window_function/ob_window_function_vec_op.cpp +++ b/src/sql/engine/window_function/ob_window_function_vec_op.cpp @@ -698,7 +698,7 @@ int ObWindowFunctionVecOp::init() all_part_exprs_.set_attr(attr); int prev_pushdown_pby_col_count = -1; WFInfoFixedArray &wf_infos = const_cast(MY_SPEC.wf_infos_); - if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(dir_id_))) { + if (OB_FAIL(ObChunkStoreUtil::alloc_dir_id(tenant_id, dir_id_))) { LOG_WARN("failed to alloc dir id", K(ret)); } else if (MY_SPEC.max_batch_size_ > 0) { if (OB_FAIL(all_expr_vector_copy_.init(child_->get_spec().output_, eval_ctx_))) { diff --git a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp index 38da37d16..f5aab3e66 100644 --- a/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp +++ b/src/storage/ddl/ob_direct_insert_sstable_ctx_new.cpp @@ -2477,7 +2477,7 @@ int ObTabletFullDirectLoadMgr::open(const int64_t current_execution_id, share::S } else if (OB_UNLIKELY(!tablet_handle.is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("tablet handle is invalid", K(ret), K(tablet_handle)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(MTL_ID(), dir_id_))) { LOG_WARN("alloc dir id failed", K(ret)); } else if (current_execution_id < execution_id_ || current_execution_id < tablet_handle.get_obj()->get_tablet_meta().ddl_execution_id_) { diff --git a/src/storage/direct_load/ob_direct_load_tmp_file.cpp b/src/storage/direct_load/ob_direct_load_tmp_file.cpp index ec64853cd..508e39305 100644 --- a/src/storage/direct_load/ob_direct_load_tmp_file.cpp +++ b/src/storage/direct_load/ob_direct_load_tmp_file.cpp @@ -195,8 +195,8 @@ int ObDirectLoadTmpFileIOHandle::open(const ObDirectLoadTmpFileHandle &file_hand if (OB_UNLIKELY(!file_handle.is_valid())) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid args", KR(ret), K(file_handle)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file->get_file_id().fd_, - file_size))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size( + MTL_ID(), tmp_file->get_file_id().fd_, file_size))) { LOG_WARN("fail to get tmp file size", KR(ret), KPC(tmp_file)); } else if (OB_UNLIKELY(file_size != tmp_file->get_file_size())) { ret = OB_ERR_UNEXPECTED; @@ -251,7 +251,7 @@ int ObDirectLoadTmpFileIOHandle::pread(char *buf, int64_t size, int64_t offset) while (OB_SUCC(ret)) { if (OB_FAIL(check_status())) { LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.pread(io_info_, offset, file_io_handle_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.pread(MTL_ID(), io_info_, offset, file_io_handle_))) { LOG_WARN("fail to do pread from tmp file", KR(ret), K_(io_info), K(offset)); if (OB_LIKELY(is_retry_err(ret))) { if (++retry_cnt <= MAX_RETRY_CNT) { @@ -290,14 +290,14 @@ int ObDirectLoadTmpFileIOHandle::write(char *buf, int64_t size) LOG_WARN("fail to check status", KR(ret)); } // TODO(suzhi.yt): 先保留原来的调用, aio_write提交成功就相当于写成功了 - else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) { + else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(MTL_ID(), io_info_, file_io_handle_))) { LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info)); if (OB_LIKELY(is_retry_err(ret))) { if (++retry_cnt <= MAX_RETRY_CNT) { ret = OB_SUCCESS; int64_t new_file_size = 0; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_, - new_file_size))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size( + MTL_ID(), tmp_file_->get_file_id().fd_, new_file_size))) { LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file)); } else { const int64_t write_size = new_file_size - tmp_file_->get_file_size(); @@ -342,7 +342,7 @@ int ObDirectLoadTmpFileIOHandle::aio_pread(char *buf, int64_t size, int64_t offs while (OB_SUCC(ret)) { if (OB_FAIL(check_status())) { LOG_WARN("fail to check status", KR(ret)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_pread(io_info_, offset, file_io_handle_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_pread(MTL_ID(), io_info_, offset, file_io_handle_))) { LOG_WARN("fail to do aio pread from tmp file", KR(ret), K_(io_info), K(offset)); if (OB_LIKELY(is_retry_err(ret))) { if (++retry_cnt <= MAX_RETRY_CNT) { @@ -381,14 +381,14 @@ int ObDirectLoadTmpFileIOHandle::aio_write(char *buf, int64_t size) LOG_WARN("fail to check status", KR(ret)); } // aio_write提交成功就相当于写成功了 - else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info_, file_io_handle_))) { + else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(MTL_ID(), io_info_, file_io_handle_))) { LOG_WARN("fail to do aio write to tmp file", KR(ret), K_(io_info)); if (OB_LIKELY(is_retry_err(ret))) { if (++retry_cnt <= MAX_RETRY_CNT) { ret = OB_SUCCESS; int64_t new_file_size = 0; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.get_tmp_file_size(tmp_file_->get_file_id().fd_, - new_file_size))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.get_tmp_file_size( + MTL_ID(), tmp_file_->get_file_id().fd_, new_file_size))) { LOG_WARN("fail to get tmp file size", KR(ret), KPC_(tmp_file)); } else { const int64_t write_size = new_file_size - tmp_file_->get_file_size(); @@ -478,7 +478,7 @@ int ObDirectLoadTmpFileManager::alloc_dir(int64_t &dir_id) if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("ObDirectLoadTmpFileManager not init", KR(ret), KP(this)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(MTL_ID(), dir_id))) { LOG_WARN("fail to alloc dir", KR(ret)); } return ret; @@ -498,7 +498,7 @@ int ObDirectLoadTmpFileManager::alloc_file(int64_t dir_id, ObDirectLoadTmpFile *tmp_file = nullptr; ObDirectLoadTmpFileId file_id; file_id.dir_id_ = dir_id; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(file_id.fd_, file_id.dir_id_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(MTL_ID(), file_id.fd_, file_id.dir_id_))) { LOG_WARN("fail to open file", KR(ret)); } else if (OB_ISNULL(tmp_file = file_allocator_.alloc(this, file_id))) { ret = OB_ALLOCATE_MEMORY_FAILED; @@ -512,7 +512,7 @@ int ObDirectLoadTmpFileManager::alloc_file(int64_t dir_id, tmp_file = nullptr; } if (file_id.is_valid()) { - FILE_MANAGER_INSTANCE_V2.remove(file_id.fd_); + FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(MTL_ID(), file_id.fd_); } } } @@ -531,7 +531,7 @@ void ObDirectLoadTmpFileManager::put_file(ObDirectLoadTmpFile *tmp_file) } else { const int64_t ref_count = tmp_file->get_ref_count(); if (0 == ref_count) { - FILE_MANAGER_INSTANCE_V2.remove(tmp_file->get_file_id().fd_); + FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(MTL_ID(), tmp_file->get_file_id().fd_); file_allocator_.free(tmp_file); } else { LOG_ERROR("tmp file ref count must be zero", K(ref_count)); diff --git a/src/storage/ob_parallel_external_sort.h b/src/storage/ob_parallel_external_sort.h index 5d95f80e7..65c1d9ac2 100644 --- a/src/storage/ob_parallel_external_sort.h +++ b/src/storage/ob_parallel_external_sort.h @@ -236,7 +236,7 @@ int ObFragmentWriterV2::open(const int64_t buf_size, const int64_t expire_tim if (NULL == (buf_ = static_cast(allocator_.alloc(align_buf_size)))) { ret = common::OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to allocate buffer", K(ret), K(align_buf_size)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.open(fd_, dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.open(tenant_id, fd_, dir_id_))) { STORAGE_LOG(WARN, "fail to open file", K(ret)); } else { buf_size_ = align_buf_size; @@ -318,7 +318,7 @@ int ObFragmentWriterV2::flush_buffer() io_info.buf_ = buf_; io_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_WRITE); io_info.io_timeout_ms_ = timeout_ms; - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_write(io_info, file_io_handle_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_write(tenant_id_, io_info, file_io_handle_))) { STORAGE_LOG(WARN, "fail to do aio write macro file", K(ret), K(io_info)); } else { macro_buffer_writer_.assign(ObExternalSortConstant::BUF_HEADER_LENGTH, buf_size_, buf_); @@ -565,7 +565,7 @@ int ObFragmentReaderV2::prefetch() io_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_INDEX_BUILD_READ); if (OB_FAIL(ObExternalSortConstant::get_io_timeout_ms(expire_timestamp_, io_info.io_timeout_ms_))) { STORAGE_LOG(WARN, "fail to get io timeout ms", K(ret), K(expire_timestamp_), K(io_info.io_timeout_ms_)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.aio_read(io_info, file_io_handles_[handle_index]))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.aio_read(tenant_id_, io_info, file_io_handles_[handle_index]))) { if (common::OB_ITER_END != ret) { STORAGE_LOG(WARN, "fail to do aio read from macro file", K(ret), K(fd_), K(io_info)); } else { @@ -684,7 +684,7 @@ int ObFragmentReaderV2::clean_up() for (int64_t i = 0; i < MAX_HANDLE_COUNT; ++i) { file_io_handles_[i].reset(); } - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.remove(fd_))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.remove(tenant_id_, fd_))) { STORAGE_LOG(WARN, "fail to remove macro file", K(ret)); } reset(); @@ -1070,7 +1070,7 @@ int ObExternalSortRound::init( ret = common::OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "invalid argument", K(ret), K(merge_count), K(file_buf_size), KP(compare)); - } else if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id_))) { + } else if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(tenant_id, dir_id_))) { STORAGE_LOG(WARN, "fail to alloc dir", K(ret)); } else { is_inited_ = true; diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp index a49e14cdf..446013c5b 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp @@ -286,7 +286,7 @@ ObSharedNothingTmpFile::ObSharedNothingTmpFile() ObSharedNothingTmpFile::~ObSharedNothingTmpFile() { - destroy(); + reset(); } int ObSharedNothingTmpFile::init(const uint64_t tenant_id, const int64_t fd, const int64_t dir_id, @@ -344,13 +344,13 @@ int ObSharedNothingTmpFile::init(const uint64_t tenant_id, const int64_t fd, con return ret; } -int ObSharedNothingTmpFile::destroy() +int ObSharedNothingTmpFile::release_resource() { int ret = OB_SUCCESS; int64_t fd_backup = fd_; int64_t free_cnt = 0; - LOG_INFO("tmp file destroy start", KR(ret), K(fd_), KPC(this)); + LOG_INFO("tmp file release_resource start", KR(ret), K(fd_), KPC(this)); if (cached_page_nums_ > 0) { uint32_t cur_page_id = begin_page_id_; @@ -360,7 +360,7 @@ int ObSharedNothingTmpFile::destroy() ret = OB_ERR_UNEXPECTED; LOG_WARN("begin page virtual id is invalid", KR(ret), K(fd_), K(begin_page_virtual_id_)); } else if (OB_FAIL(wbp_->free_page(fd_, cur_page_id, ObTmpFilePageUniqKey(begin_page_virtual_id_), next_page_id))) { - LOG_WARN("fail to free page", KR(ret), K(fd_), K(cur_page_id), K(begin_page_virtual_id_)); + LOG_ERROR("fail to free page", KR(ret), K(fd_), K(cur_page_id), K(begin_page_virtual_id_)); } else { free_cnt++; cur_page_id = next_page_id; @@ -369,22 +369,16 @@ int ObSharedNothingTmpFile::destroy() } } if (OB_SUCC(ret) && cached_page_nums_ != free_cnt) { - LOG_ERROR("tmp file destroy, cached_page_nums_ and free_cnt are not equal", KR(ret), K(fd_), K(free_cnt), KPC(this)); + LOG_ERROR("tmp file release resource, cached_page_nums_ and free_cnt are not equal", KR(ret), K(fd_), K(free_cnt), KPC(this)); } - LOG_INFO("tmp file destroy, free wbp page phase over", KR(ret), K(fd_), KPC(this)); + LOG_INFO("tmp file release resource, free wbp page phase over", KR(ret), K(fd_), KPC(this)); if (FAILEDx(meta_tree_.clear(truncated_offset_, file_size_))) { - LOG_WARN("fail to clear", KR(ret), K(fd_), K(truncated_offset_), K(file_size_)); + LOG_ERROR("fail to clear meta tree", KR(ret), K(fd_), K(truncated_offset_), K(file_size_)); } - LOG_INFO("tmp file destroy, meta_tree_ clear phase over", KR(ret), K(fd_), KPC(this)); - - if (OB_SUCC(ret)) { - reset(); - } - - LOG_INFO("tmp file destroy over", KR(ret), "fd", fd_backup); + LOG_INFO("tmp file release resource over", KR(ret), "fd", fd_); return ret; } diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h index 9e8c2a0cb..0b9e6d789 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h @@ -216,7 +216,7 @@ public: ObIAllocator *wbp_index_cache_bkt_allocator, ObTmpFilePageCacheController *page_cache_controller, const char* label); - int destroy(); + int release_resource(); void reset(); bool can_remove(); bool is_deleting(); diff --git a/src/storage/tmp_file/ob_sn_tmp_file_manager.cpp b/src/storage/tmp_file/ob_sn_tmp_file_manager.cpp index c3ca15a28..d733f37d0 100644 --- a/src/storage/tmp_file/ob_sn_tmp_file_manager.cpp +++ b/src/storage/tmp_file/ob_sn_tmp_file_manager.cpp @@ -19,6 +19,9 @@ namespace oceanbase { namespace tmp_file { +int64_t ObSNTenantTmpFileManager::current_fd_ = ObTmpFileGlobal::INVALID_TMP_FILE_FD; +int64_t ObSNTenantTmpFileManager::current_dir_id_ = ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID; + ObSNTenantTmpFileManager::ObSNTenantTmpFileManager() : is_inited_(false), tenant_id_(OB_INVALID_TENANT_ID), @@ -30,9 +33,7 @@ ObSNTenantTmpFileManager::ObSNTenantTmpFileManager() wbp_index_cache_bucket_allocator_(), files_(), tmp_file_block_manager_(), - page_cache_controller_(tmp_file_block_manager_), - current_fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD), - current_dir_id_(ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID) + page_cache_controller_(tmp_file_block_manager_) { } @@ -111,18 +112,42 @@ void ObSNTenantTmpFileManager::wait() void ObSNTenantTmpFileManager::destroy() { - last_access_tenant_config_ts_ = -1; - last_meta_mem_limit_ = META_DEFAULT_LIMIT; - page_cache_controller_.destroy(); - files_.destroy(); - tmp_file_block_manager_.destroy(); - tmp_file_allocator_.reset(); - callback_allocator_.reset(); - wbp_index_cache_allocator_.reset(); - wbp_index_cache_bucket_allocator_.reset(); - is_inited_ = false; - current_fd_ = ObTmpFileGlobal::INVALID_TMP_FILE_FD; - current_dir_id_ = ObTmpFileGlobal::INVALID_TMP_FILE_DIR_ID; + if (is_inited_) { + is_inited_ = false; + last_access_tenant_config_ts_ = -1; + last_meta_mem_limit_ = META_DEFAULT_LIMIT; + page_cache_controller_.destroy(); + int64_t curr_file_cnt = files_.count(); + if (OB_UNLIKELY(curr_file_cnt > 0)) { + int ret = OB_SUCCESS; + TmpFileMap::BlurredIterator iter(files_); + while (OB_SUCC(ret)) { + ObTmpFileKey unused_key(ObTmpFileGlobal::INVALID_TMP_FILE_FD); + ObTmpFileHandle handle; + if (OB_FAIL(iter.next(unused_key, handle))) { + if (OB_ITER_END == ret) { + ret = OB_SUCCESS; + break; + } else { + LOG_WARN("fail to get next tmp file", KR(ret), K(tenant_id_)); + } + } else { + LOG_ERROR("the tmp file has not been removed when tmp file mgr is destroying", KPC(handle.get())); + } + } // end while + + int64_t new_file_cnt = files_.count(); + if (OB_UNLIKELY(new_file_cnt != curr_file_cnt)) { + LOG_ERROR("there are some operation for tmp files when tmp file mgr is destroying", K(tenant_id_), K(curr_file_cnt)); + } + } + files_.destroy(); + tmp_file_block_manager_.destroy(); + tmp_file_allocator_.reset(); + callback_allocator_.reset(); + wbp_index_cache_allocator_.reset(); + wbp_index_cache_bucket_allocator_.reset(); + } LOG_INFO("ObSNTenantTmpFileManager destroy", K(tenant_id_), KP(this)); } @@ -220,8 +245,12 @@ int ObSNTenantTmpFileManager::remove(const int64_t fd) usleep(100 * 1000); // 100ms } } - tmp_file_handle.reset(); - tmp_file_allocator_.free(tmp_file); + if (OB_FAIL(tmp_file->release_resource())) { + LOG_ERROR("fail to release resource", KR(ret), KP(tmp_file), KPC(tmp_file), K(lbt())); + } else { + tmp_file_handle.reset(); + tmp_file_allocator_.free(tmp_file); + } } LOG_INFO("remove a tmp file over", KR(ret), K(start_remove_ts), K(fd), K(lbt())); diff --git a/src/storage/tmp_file/ob_sn_tmp_file_manager.h b/src/storage/tmp_file/ob_sn_tmp_file_manager.h index 119bfc56f..70e61457c 100644 --- a/src/storage/tmp_file/ob_sn_tmp_file_manager.h +++ b/src/storage/tmp_file/ob_sn_tmp_file_manager.h @@ -56,7 +56,7 @@ public: void destroy(); int alloc_dir(int64_t &dir_id); - int open(int64_t &fd, const int64_t &dir_id, const char* const label = nullptr); + int open(int64_t &fd, const int64_t &dir_id, const char* const label); int remove(const int64_t fd); void refresh_meta_memory_limit(); @@ -116,8 +116,8 @@ private: ObTmpFileBlockManager tmp_file_block_manager_; ObTmpFilePageCacheController page_cache_controller_; - int64_t current_fd_; - int64_t current_dir_id_; + static int64_t current_fd_; + static int64_t current_dir_id_; }; } // end namespace tmp_file diff --git a/src/storage/tmp_file/ob_tmp_file_eviction_manager.cpp b/src/storage/tmp_file/ob_tmp_file_eviction_manager.cpp index 7e43812e7..1d608f3bc 100644 --- a/src/storage/tmp_file/ob_tmp_file_eviction_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_eviction_manager.cpp @@ -23,12 +23,37 @@ namespace tmp_file { void ObTmpFileEvictionManager::destroy() { + // int ret = OB_SUCCESS; + // ObSharedNothingTmpFile *file = nullptr; { ObSpinLockGuard guard(meta_list_lock_); + // while (!file_meta_eviction_list_.is_empty()) { + // ObTmpFileHandle file_handle; + // if (OB_ISNULL(file = &file_meta_eviction_list_.remove_first()->file_)) { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("file is null", KR(ret)); + // } else if (OB_FAIL(file_handle.init(file))) { + // LOG_WARN("fail to init file handle", KR(ret), KP(file)); + // } else { + // file->dec_ref_cnt(); + // } + // } file_meta_eviction_list_.reset(); } + { ObSpinLockGuard guard(data_list_lock_); + // while (!file_data_eviction_list_.is_empty()) { + // ObTmpFileHandle file_handle; + // if (OB_ISNULL(file = &file_data_eviction_list_.remove_first()->file_)) { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("file is null", KR(ret)); + // } else if (OB_FAIL(file_handle.init(file))) { + // LOG_WARN("fail to init file handle", KR(ret), KP(file)); + // } else { + // file->dec_ref_cnt(); + // } + // } file_data_eviction_list_.reset(); } } diff --git a/src/storage/tmp_file/ob_tmp_file_flush_priority_manager.cpp b/src/storage/tmp_file/ob_tmp_file_flush_priority_manager.cpp index 5c4567138..15ca26500 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_priority_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_priority_manager.cpp @@ -42,13 +42,37 @@ int ObTmpFileFlushPriorityManager::init() void ObTmpFileFlushPriorityManager::destroy() { + // int ret = OB_SUCCESS; + // ObSharedNothingTmpFile *file = nullptr; is_inited_ = false; for (int64_t i = 0; i < FileList::MAX; i++) { ObSpinLockGuard guard(data_list_locks_[i]); + // while (!data_flush_lists_[i].is_empty()) { + // ObTmpFileHandle file_handle; + // if (OB_ISNULL(file = &data_flush_lists_[i].remove_first()->file_)) { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("file is null", KR(ret)); + // } else if (OB_FAIL(file_handle.init(file))) { + // LOG_WARN("fail to init file handle", KR(ret), KP(file)); + // } else { + // file->dec_ref_cnt(); + // } + // } data_flush_lists_[i].reset(); } for (int64_t i = 0; i < FileList::MAX; i++) { ObSpinLockGuard guard(meta_list_locks_[i]); + // while (!meta_flush_lists_[i].is_empty()) { + // ObTmpFileHandle file_handle; + // if (OB_ISNULL(file = &meta_flush_lists_[i].remove_first()->file_)) { + // ret = OB_ERR_UNEXPECTED; + // LOG_WARN("file is null", KR(ret)); + // } else if (OB_FAIL(file_handle.init(file))) { + // LOG_WARN("fail to init file handle", KR(ret), KP(file)); + // } else { + // file->dec_ref_cnt(); + // } + // } meta_flush_lists_[i].reset(); } } diff --git a/src/storage/tmp_file/ob_tmp_file_manager.cpp b/src/storage/tmp_file/ob_tmp_file_manager.cpp index 7ff7f4323..1d4795e38 100644 --- a/src/storage/tmp_file/ob_tmp_file_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_manager.cpp @@ -30,19 +30,6 @@ int ObTenantTmpFileManager::mtl_init(ObTenantTmpFileManager *&manager) return ret; } -ObTenantTmpFileManager &ObTenantTmpFileManager::get_instance() -{ - int ret = OB_SUCCESS; - ObTenantTmpFileManager *tmp_file_manager = MTL(tmp_file::ObTenantTmpFileManager *); - if (OB_ISNULL(tmp_file_manager)) { - ret = OB_ERR_UNEXPECTED; - LOG_ERROR("ObTenantTmpFileManager is null, please check whether MTL module is normal", KR(ret)); - ob_abort(); - } - - return *tmp_file_manager; -} - int ObTenantTmpFileManager::init() { int ret = OB_SUCCESS; @@ -162,8 +149,8 @@ int ObTenantTmpFileManager::open(int64_t &fd, const int64_t &dir_id, const char* } #endif } else { - if (OB_FAIL(sn_file_manager_.open(fd, dir_id))) { - LOG_WARN("fail to open file in sn tmp file manager", KR(ret), K(fd), K(dir_id)); + if (OB_FAIL(sn_file_manager_.open(fd, dir_id, label))) { + LOG_WARN("fail to open file in sn tmp file manager", KR(ret), K(fd), K(dir_id), KP(label)); } } return ret; @@ -390,5 +377,338 @@ int ObTenantTmpFileManager::get_tmp_file_info(const int64_t fd, ObSNTmpFileInfo return ret; } +ObTenantTmpFileManagerWithMTLSwitch &ObTenantTmpFileManagerWithMTLSwitch::get_instance() +{ + static ObTenantTmpFileManagerWithMTLSwitch mgr; + + return mgr; +} + +int ObTenantTmpFileManagerWithMTLSwitch::alloc_dir(const uint64_t tenant_id, int64_t &dir_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->alloc_dir(dir_id))) { + LOG_WARN("fail to alloc dir", KR(ret), K(tenant_id)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::open(const uint64_t tenant_id, int64_t &fd, const int64_t &dir_id) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->open(fd, dir_id))) { + LOG_WARN("fail to open", KR(ret), K(tenant_id)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::remove(const uint64_t tenant_id, const int64_t fd) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->remove(fd))) { + LOG_WARN("fail to remove", KR(ret), K(tenant_id)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::aio_read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->aio_read(io_info, io_handle))) { + LOG_WARN("fail to aio read", KR(ret), K(tenant_id), K(io_info)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::aio_pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, + const int64_t offset, ObTmpFileIOHandle &io_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->aio_pread(io_info, offset, io_handle))) { + LOG_WARN("fail to aio pread", KR(ret), K(tenant_id), K(io_info), K(offset)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->read(io_info, io_handle))) { + LOG_WARN("fail to read", KR(ret), K(tenant_id), K(io_info)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->pread(io_info, offset, io_handle))) { + LOG_WARN("fail to pread", KR(ret), K(tenant_id), K(io_info), K(offset)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::aio_write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->aio_write(io_info, io_handle))) { + LOG_WARN("fail to aio write", KR(ret), K(tenant_id), K(io_info)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->write(io_info))) { + LOG_WARN("fail to write", KR(ret), K(tenant_id), K(io_info)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::truncate(const uint64_t tenant_id, const int64_t fd, const int64_t offset) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->truncate(fd, offset))) { + LOG_WARN("fail to truncate", KR(ret), K(tenant_id), K(fd), K(offset)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_size(const uint64_t tenant_id, const int64_t fd, int64_t &file_size) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->get_tmp_file_size(fd, file_size))) { + LOG_WARN("fail to get tmp file size", KR(ret), K(tenant_id), K(fd)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_fds(const uint64_t tenant_id, ObIArray &fd_arr) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->get_tmp_file_fds(fd_arr))) { + LOG_WARN("fail to get tmp file fds", KR(ret), K(tenant_id)); + } + } + return ret; +} + +int ObTenantTmpFileManagerWithMTLSwitch::get_tmp_file_info(const uint64_t tenant_id, const int64_t fd, ObSNTmpFileInfo &tmp_file_info) +{ + int ret = OB_SUCCESS; + if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id) || is_virtual_tenant_id(tenant_id))) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid argument", KR(ret), K(tenant_id)); + } else { + MAKE_TENANT_SWITCH_SCOPE_GUARD(guard); + if (tenant_id != MTL_ID()) { + if (OB_FAIL(guard.switch_to(tenant_id))) { + LOG_WARN("fail to switch tenant", KR(ret), K(tenant_id)); + } + } + ObTenantTmpFileManager* tmp_file_mgr = MTL(ObTenantTmpFileManager*); + if (OB_FAIL(ret)) { + } else if (OB_ISNULL(tmp_file_mgr)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tmp file manager is null", KR(ret), K(tenant_id)); + } else if (OB_FAIL(tmp_file_mgr->get_tmp_file_info(fd, tmp_file_info))) { + LOG_WARN("fail to get tmp file info", KR(ret), K(tenant_id), K(fd)); + } + } + return ret; +} + } // end namespace tmp_file } // end namespace oceanbase diff --git a/src/storage/tmp_file/ob_tmp_file_manager.h b/src/storage/tmp_file/ob_tmp_file_manager.h index 9ad6c56f4..a1fda02a3 100644 --- a/src/storage/tmp_file/ob_tmp_file_manager.h +++ b/src/storage/tmp_file/ob_tmp_file_manager.h @@ -29,7 +29,6 @@ public: ObTenantTmpFileManager(): is_inited_(false) {} ~ObTenantTmpFileManager() { destroy(); } static int mtl_init(ObTenantTmpFileManager *&manager); - static ObTenantTmpFileManager &get_instance(); ObSNTenantTmpFileManager &get_sn_file_manager() { return sn_file_manager_; } blocksstable::ObSSTenantTmpFileManager &get_ss_file_manager() { return ss_file_manager_; } int init(); @@ -65,7 +64,32 @@ private: blocksstable::ObSSTenantTmpFileManager ss_file_manager_; }; -#define FILE_MANAGER_INSTANCE_V2 (::oceanbase::tmp_file::ObTenantTmpFileManager::get_instance()) +class ObTenantTmpFileManagerWithMTLSwitch final +{ +public: + static ObTenantTmpFileManagerWithMTLSwitch &get_instance(); + int alloc_dir(const uint64_t tenant_id, int64_t &dir_id); + int open(const uint64_t tenant_id, int64_t &fd, const int64_t &dir_id); + int remove(const uint64_t tenant_id, const int64_t fd); + +public: + int aio_read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle); + int aio_pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle); + int read(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle); + int pread(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, const int64_t offset, ObTmpFileIOHandle &io_handle); + // NOTE: + // only support append write. + int aio_write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info, ObTmpFileIOHandle &io_handle); + // NOTE: + // only support append write. + int write(const uint64_t tenant_id, const ObTmpFileIOInfo &io_info); + int truncate(const uint64_t tenant_id, const int64_t fd, const int64_t offset); + int get_tmp_file_size(const uint64_t tenant_id, const int64_t fd, int64_t &file_size); + int get_tmp_file_fds(const uint64_t tenant_id, ObIArray &fd_arr); + int get_tmp_file_info(const uint64_t tenant_id, const int64_t fd, ObSNTmpFileInfo &tmp_file_info); +}; + +#define FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH (::oceanbase::tmp_file::ObTenantTmpFileManagerWithMTLSwitch::get_instance()) } // end namespace tmp_file } // end namespace oceanbase diff --git a/src/storage/tmp_file/ob_tmp_file_write_buffer_index_cache.cpp b/src/storage/tmp_file/ob_tmp_file_write_buffer_index_cache.cpp index 680e5e580..8729b11b6 100644 --- a/src/storage/tmp_file/ob_tmp_file_write_buffer_index_cache.cpp +++ b/src/storage/tmp_file/ob_tmp_file_write_buffer_index_cache.cpp @@ -23,7 +23,7 @@ namespace oceanbase namespace tmp_file { ObTmpFileWBPIndexCache::ObTmpFileWBPIndexCache() : - ObTmpFileCircleArray(), bucket_array_allocator_(), bucket_allocator_(), + ObTmpFileCircleArray(), bucket_array_allocator_(nullptr), bucket_allocator_(nullptr), page_buckets_(nullptr) , fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD), wbp_(nullptr), max_bucket_array_capacity_(MAX_BUCKET_ARRAY_CAPACITY) {} diff --git a/unittest/sql/engine/basic/test_chunk_datum_store.cpp b/unittest/sql/engine/basic/test_chunk_datum_store.cpp index d7ae098cc..f6820c3c6 100644 --- a/unittest/sql/engine/basic/test_chunk_datum_store.cpp +++ b/unittest/sql/engine/basic/test_chunk_datum_store.cpp @@ -906,9 +906,9 @@ TEST_F(TestChunkDatumStore, test_only_disk_data) int64_t rows = round * cnt; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkDatumStore rs("TEST"); - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkDatumStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(1L << 30); // disk data CALL(append_rows, rs, cnt); @@ -933,9 +933,9 @@ TEST_F(TestChunkDatumStore, test_only_disk_data1) int64_t rows = round * cnt; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkDatumStore rs("TEST"); - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkDatumStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(1L << 30); // disk data CALL(append_rows, rs, cnt); @@ -994,10 +994,11 @@ TEST_F(TestChunkDatumStore, test_append_block) //recv ObChunkDatumStore rs2("TEST"); - rs2.alloc_dir_id(); ObChunkDatumStore::Block *block2 = reinterpret_cast(mem2); ret = rs2.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ret = rs2.alloc_dir_id(); + ASSERT_EQ(OB_SUCCESS, ret); for (int64_t i = 0; OB_SUCC(ret) && i < 100; ++i) { ret = rs2.append_block(block->get_buffer()->data(), block->get_buffer()->data_size(), true); } diff --git a/unittest/sql/engine/basic/test_chunk_row_store.cpp b/unittest/sql/engine/basic/test_chunk_row_store.cpp index 72b13ab54..63688424e 100644 --- a/unittest/sql/engine/basic/test_chunk_row_store.cpp +++ b/unittest/sql/engine/basic/test_chunk_row_store.cpp @@ -210,13 +210,13 @@ public: { ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, 2 << 20); ObChunkRowStore rs(&alloc); - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); int64_t v = 0; int64_t i; int64_t begin = ObTimeUtil::current_time(); int ret = OB_SUCCESS; ret = rs.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_block_size(block_size); begin = ObTimeUtil::current_time(); for (i = 0; i < rows; i++) { @@ -337,10 +337,10 @@ TEST_F(TestChunkRowStore, multi_iter) int64_t i = 0; int64_t j = 0; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ret = rs.init(1 << 20, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); LOG_INFO("starting basic test: append 3000 rows"); CALL(append_rows, rs, total); // approximate 1MB, no need to dump @@ -380,9 +380,9 @@ TEST_F(TestChunkRowStore, multi_iter) TEST_F(TestChunkRowStore, keep_projector0) { ObChunkRowStore rs(NULL); - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); - ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID, + ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID, common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); const int64_t OBJ_CNT = 3; ObObj objs[OBJ_CNT]; ObNewRow r; @@ -438,10 +438,10 @@ TEST_F(TestChunkRowStore, keep_projector0) TEST_F(TestChunkRowStore, keep_projector2) { ObChunkRowStore rs(NULL); - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); - ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID, + ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID, common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true, ObChunkRowStore::STORE_MODE::FULL)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); const int64_t OBJ_CNT = 3; ObObj objs[OBJ_CNT]; ObNewRow r; @@ -495,15 +495,15 @@ TEST_F(TestChunkRowStore, keep_projector2) TEST_F(TestChunkRowStore, keep_projector2_with_copy) { ObChunkRowStore rs(NULL); + ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID, + common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true, + ObChunkRowStore::STORE_MODE::FULL)); ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); - ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID, - common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true, - ObChunkRowStore::STORE_MODE::FULL)); ObChunkRowStore rs2(NULL); - ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id()); - ASSERT_EQ(OB_SUCCESS, rs2.init(100 << 20, OB_SERVER_TENANT_ID, ObCtxIds::DEFAULT_CTX_ID, + ASSERT_EQ(OB_SUCCESS, rs2.init(100 << 20, tenant_id_, ObCtxIds::DEFAULT_CTX_ID, common::ObModIds::OB_SQL_CHUNK_ROW_STORE, true, ObChunkRowStore::STORE_MODE::FULL)); + ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id()); const int64_t OBJ_CNT = 3; ObObj objs[OBJ_CNT]; ObNewRow r; @@ -555,11 +555,11 @@ TEST_F(TestChunkRowStore, basic2) { int ret = OB_SUCCESS; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; //mem limit 5M ret = rs.init(5L << 20, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); CALL(append_rows, rs, 100000); ASSERT_GT(rs.get_mem_hold(), 0); ASSERT_GT(rs.get_file_size(), 0); @@ -577,12 +577,12 @@ TEST_F(TestChunkRowStore, chunk_iterator) { int ret = OB_SUCCESS; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::ChunkIterator chunk_it; ObChunkRowStore::RowIterator it; //mem limit 5M ret = rs.init(5L << 20, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); CALL(append_rows, rs, 100000); ASSERT_GT(rs.get_mem_hold(), 0); ASSERT_GT(rs.get_file_size(), 0); @@ -636,13 +636,13 @@ TEST_F(TestChunkRowStore, test_copy_row) int ret = OB_SUCCESS; int64_t rows = 1000; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; const ObChunkRowStore::StoredRow *sr; LOG_INFO("starting mem_perf test: append rows", K(rows)); int64_t begin = ObTimeUtil::current_time(); ret = rs.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); CALL(append_rows, rs, rows); ASSERT_EQ(OB_SUCCESS, rs.begin(it)); ASSERT_EQ(OB_SUCCESS, it.get_next_row(sr)); @@ -653,12 +653,12 @@ TEST_F(TestChunkRowStore, mem_perf) int ret = OB_SUCCESS; int64_t rows = 2000000; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; LOG_INFO("starting mem_perf test: append rows", K(rows)); int64_t begin = ObTimeUtil::current_time(); ret = rs.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); CALL(append_rows, rs, rows); LOG_WARN("write time:", K(rows), K(ObTimeUtil::current_time() - begin)); CALL(verify_n_rows, rs, it, 10000, true); @@ -748,12 +748,12 @@ TEST_F(TestOARowStore, disk_time_cmp) int ret = OB_SUCCESS; int64_t rows = 2000000; ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); int64_t v = 0; int64_t i; int64_t begin = ObTimeUtil::current_time(); ret = rs.init(1 << 20, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); begin = ObTimeUtil::current_time(); for (int64_t i = 0; i < rows; i++) { ObNewRow &row = gen_row(i); @@ -812,9 +812,9 @@ TEST_F(TestChunkRowStore, disk) int64_t rows = round * 10000; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(100L << 20); for (int64_t i = 0; i < round; i++) { if (i == round / 2) { @@ -849,9 +849,9 @@ TEST_F(TestChunkRowStore, disk_with_chunk) int64_t rows = round * cnt; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(1L << 20); for (int64_t i = 0; i < round; i++) { //if (i == round / 2) { @@ -920,9 +920,9 @@ TEST_F(TestChunkRowStore, disk_with_chunk) // LOG_INFO("starting dump mem test: append rows", K(500000)); // int ret = OB_SUCCESS; // ObChunkRowStore rs; -// ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); // ObChunkRowStore::Iterator it; // ret = rs.init(0, tenant_id_, ctx_id_, mod_id_); +// ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); // CALL(append_rows, rs, 500000); // int64_t avg_row_size = rs.get_mem_hold() / rs.get_row_cnt(); // LOG_WARN("average row size", K(avg_row_size)); @@ -1005,7 +1005,6 @@ TEST_F(TestChunkRowStore, test_add_block) int ret = OB_SUCCESS; //send ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Block *block; ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, 2 << 20); @@ -1014,6 +1013,7 @@ TEST_F(TestChunkRowStore, test_add_block) ret = rs.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); int64_t min_size = rs.min_blk_size(row_size); void *mem = alloc.alloc(min_size); @@ -1037,10 +1037,10 @@ TEST_F(TestChunkRowStore, test_add_block) //recv ObChunkRowStore rs2; - ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id()); ObChunkRowStore::Block *block2 = reinterpret_cast(mem2); ret = rs2.init(0, tenant_id_, ctx_id_, label_); ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(OB_SUCCESS, rs2.alloc_dir_id()); ret = rs2.add_block(block2, true); ASSERT_EQ(OB_SUCCESS, ret); @@ -1064,10 +1064,10 @@ TEST_F(TestChunkRowStore, row_extend_row) int64_t rows = round * 10000; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_, true, ObChunkRowStore::WITHOUT_PROJECTOR, 8)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); LOG_INFO("starting basic test: append 3000 rows"); int64_t ret = OB_SUCCESS; int64_t base = rs.get_row_cnt(); @@ -1113,9 +1113,9 @@ TEST_F(TestChunkRowStore, test_both_disk_and_memory) int64_t rows = round * cnt; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(1L << 30); // disk data CALL(append_rows, rs, cnt); @@ -1140,9 +1140,9 @@ TEST_F(TestChunkRowStore, test_only_disk_data) int64_t rows = round * cnt; LOG_INFO("starting write disk test: append rows", K(rows)); ObChunkRowStore rs; - ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); ObChunkRowStore::Iterator it; ASSERT_EQ(OB_SUCCESS, rs.init(0, tenant_id_, ctx_id_, label_)); + ASSERT_EQ(OB_SUCCESS, rs.alloc_dir_id()); rs.set_mem_limit(1L << 30); // disk data CALL(append_rows, rs, cnt); diff --git a/unittest/sql/engine/basic/test_ra_row_store_projector.cpp b/unittest/sql/engine/basic/test_ra_row_store_projector.cpp index f57e7d0c9..3c1f7a630 100644 --- a/unittest/sql/engine/basic/test_ra_row_store_projector.cpp +++ b/unittest/sql/engine/basic/test_ra_row_store_projector.cpp @@ -24,7 +24,7 @@ using namespace common; TEST(RARowStore, keep_projector) { ObRARowStore rs(NULL, true); - ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20)); + ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SYS_TENANT_ID)); const int64_t OBJ_CNT = 3; ObObj objs[OBJ_CNT]; ObNewRow r; @@ -80,7 +80,7 @@ TEST(RARowStore, alloc_project_fail) { ObEmptyAlloc alloc; ObRARowStore rs(&alloc, true); - ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20)); + ASSERT_EQ(OB_SUCCESS, rs.init(100 << 20, OB_SYS_TENANT_ID)); const int64_t OBJ_CNT = 3; ObObj objs[OBJ_CNT]; ObNewRow r; diff --git a/unittest/storage/test_parallel_external_sort.cpp b/unittest/storage/test_parallel_external_sort.cpp index f8ec2b96b..e91f55ab6 100644 --- a/unittest/storage/test_parallel_external_sort.cpp +++ b/unittest/storage/test_parallel_external_sort.cpp @@ -371,7 +371,7 @@ int TestParallelExternalSort::build_reader(const ObVector &items, co ObFragmentWriterV2 writer; int64_t dir_id = -1; std::sort(items.begin(), items.end(), compare); - if (OB_FAIL(FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id))) { + if (OB_FAIL(FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id))) { COMMON_LOG(WARN, "fail to allocate file directory", K(ret)); } else if (OB_FAIL(writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id))) { COMMON_LOG(WARN, "fail to open writer", K(ret)); @@ -682,7 +682,7 @@ TEST_F(TestParallelExternalSort, test_writer) int ret = OB_SUCCESS; int64_t dir_id = -1; - ret = FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id); + ret = FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id); ASSERT_EQ(OB_SUCCESS, ret); // single macro buffer, total write bytes is less than single macro buffer length ret = writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id); @@ -739,7 +739,7 @@ TEST_F(TestParallelExternalSort, test_reader) int ret = OB_SUCCESS; int64_t dir_id = -1; - ret = FILE_MANAGER_INSTANCE_V2.alloc_dir(dir_id); + ret = FILE_MANAGER_INSTANCE_WITH_MTL_SWITCH.alloc_dir(OB_SYS_TENANT_ID, dir_id); ASSERT_EQ(OB_SUCCESS, ret); // single macro buffer, total write bytes is less than single macro buffer length ret = writer.open(buf_cap, expire_timestamp, OB_SYS_TENANT_ID, dir_id);