diff --git a/src/storage/blocksstable/ob_tmp_file.cpp b/src/storage/blocksstable/ob_tmp_file.cpp index 355b53cb5..80a2f9841 100644 --- a/src/storage/blocksstable/ob_tmp_file.cpp +++ b/src/storage/blocksstable/ob_tmp_file.cpp @@ -24,7 +24,9 @@ namespace blocksstable { ObTmpFileIOInfo::ObTmpFileIOInfo() - : fd_(0), dir_id_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS), tenant_id_(OB_INVALID_TENANT_ID), buf_(NULL), io_desc_() + : fd_(0), dir_id_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS), + tenant_id_(OB_INVALID_TENANT_ID), buf_(NULL), io_desc_(), + disable_page_cache_(false) { } @@ -61,6 +63,7 @@ ObTmpFileIOHandle::ObTmpFileIOHandle() is_read_(false), has_wait_(false), is_finished_(false), + disable_page_cache_(false), ret_code_(OB_SUCCESS), expect_read_size_(0), last_read_offset_(-1), @@ -86,7 +89,8 @@ int ObTmpFileIOHandle::prepare_read( char *read_buf, int64_t fd, int64_t dir_id, - uint64_t tenant_id) + uint64_t tenant_id, + bool disable_page_cache) { int ret = OB_SUCCESS; if (OB_ISNULL(read_buf)) { @@ -103,6 +107,7 @@ int ObTmpFileIOHandle::prepare_read( expect_read_size_ = read_size; last_read_offset_ = read_offset; io_flag_ = io_flag; + disable_page_cache_ = disable_page_cache; if (last_fd_ != fd_) { last_fd_ = fd_; last_extent_id_ = 0; @@ -812,7 +817,8 @@ int ObTmpFile::aio_read_without_lock(const ObTmpFileIOInfo &io_info, io_info.buf_, file_meta_.get_fd(), file_meta_.get_dir_id(), - io_info.tenant_id_))){ + io_info.tenant_id_, + io_info.disable_page_cache_))) { STORAGE_LOG(WARN, "fail to prepare read io handle", K(ret), K(io_info), K(offset)); } else if (OB_UNLIKELY(io_info.size_ > 0 && offset >= tmp->get_global_end())) { ret = OB_ITER_END; diff --git a/src/storage/blocksstable/ob_tmp_file.h b/src/storage/blocksstable/ob_tmp_file.h index 50ee560f6..da65938f4 100644 --- a/src/storage/blocksstable/ob_tmp_file.h +++ b/src/storage/blocksstable/ob_tmp_file.h @@ -43,6 +43,7 @@ public: uint64_t tenant_id_; char *buf_; common::ObIOFlag io_desc_; + bool disable_page_cache_; }; class ObTmpFileIOHandle final @@ -97,6 +98,7 @@ public: ~ObTmpFileIOHandle(); OB_INLINE char *get_buffer() { return buf_; } OB_INLINE int64_t get_data_size() { return size_; } + OB_INLINE bool is_disable_page_cache() const { return disable_page_cache_; } int prepare_read( const int64_t read_size, const int64_t read_offset, @@ -104,7 +106,8 @@ public: char *read_buf, int64_t fd, int64_t dir_id, - uint64_t tenant_id); + uint64_t tenant_id, + const bool disable_page_cache); int prepare_write( char *write_buf, const int64_t write_size, @@ -160,6 +163,7 @@ private: bool is_read_; bool has_wait_; bool is_finished_; + bool disable_page_cache_; int ret_code_; int64_t expect_read_size_; int64_t last_read_offset_; // only for more than 8MB read. diff --git a/src/storage/blocksstable/ob_tmp_file_cache.cpp b/src/storage/blocksstable/ob_tmp_file_cache.cpp index 5ac085ebf..632bc0c95 100644 --- a/src/storage/blocksstable/ob_tmp_file_cache.cpp +++ b/src/storage/blocksstable/ob_tmp_file_cache.cpp @@ -119,6 +119,43 @@ int ObTmpPageCacheValue::deep_copy(char *buf, const int64_t buf_len, ObIKVCacheV return ret; } +int ObTmpPageCache::inner_read_io(const ObTmpBlockIOInfo &io_info, + ObITmpPageIOCallback *callback, + ObMacroBlockHandle ¯o_block_handle) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(read_io(io_info, nullptr, macro_block_handle))) { + if (macro_block_handle.get_io_handle().is_empty()) { + // TODO: After the continuous IO has been optimized, this should + // not happen. + if (OB_FAIL(macro_block_handle.wait())) { + STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret)); + } else if (OB_FAIL(read_io(io_info, nullptr, macro_block_handle))) { + STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); + } + } else { + STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); + } + } + // Avoid double_free with io_handle + if (OB_FAIL(ret) && OB_NOT_NULL(callback) && OB_NOT_NULL(callback->get_allocator())) { + common::ObIAllocator *allocator = callback->get_allocator(); + callback->~ObITmpPageIOCallback(); + allocator->free(callback); + } + return ret; +} + +int ObTmpPageCache::direct_read(const ObTmpBlockIOInfo &info, + ObMacroBlockHandle &mb_handle) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(inner_read_io(info, nullptr, mb_handle))) { + STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); + } + return ret; +} + int ObTmpPageCache::prefetch( const ObTmpPageCacheKey &key, const ObTmpBlockIOInfo &info, @@ -142,22 +179,8 @@ int ObTmpPageCache::prefetch( callback->offset_ = info.offset_; callback->allocator_ = &allocator; callback->key_ = key; - if (OB_FAIL(read_io(info, *callback, mb_handle))) { - if (mb_handle.get_io_handle().is_empty()) { - // TODO: After the continuous IO has been optimized, this should - // not happen. - if (OB_FAIL(mb_handle.wait())) { - STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret)); - } else if (OB_FAIL(read_io(info, *callback, mb_handle))) { - STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); - } - } else { - STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); - } - } - if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle - callback->~ObTmpPageIOCallback(); - allocator.free(callback); + if (OB_FAIL(inner_read_io(info, callback, mb_handle))) { + STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); } } } @@ -187,22 +210,8 @@ int ObTmpPageCache::prefetch( callback->allocator_ = &allocator; if (OB_FAIL(callback->page_io_infos_.assign(page_io_infos))) { STORAGE_LOG(WARN, "fail to assign page io infos", K(ret), K(page_io_infos.count()), K(info)); - } else if (OB_FAIL(read_io(info, *callback, mb_handle))) { - if (mb_handle.get_io_handle().is_empty()) { - // TODO: After the continuous IO has been optimized, this should - // not happen. - if (OB_FAIL(mb_handle.wait())) { - STORAGE_LOG(WARN, "fail to wait tmp page io", K(ret)); - } else if (OB_FAIL(read_io(info, *callback, mb_handle))) { - STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); - } - } else { - STORAGE_LOG(WARN, "fail to read tmp page from io", K(ret)); - } - } - if (OB_FAIL(ret) && OB_NOT_NULL(callback->get_allocator())) { //Avoid double_free with io_handle - callback->~ObTmpMultiPageIOCallback(); - allocator.free(callback); + } else if (OB_FAIL(inner_read_io(info, callback, mb_handle))) { + STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); } } } @@ -372,7 +381,7 @@ const char *ObTmpPageCache::ObTmpMultiPageIOCallback::get_data() return data_buf_; } -int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback &callback, +int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback *callback, ObMacroBlockHandle &handle) { int ret = OB_SUCCESS; @@ -382,7 +391,7 @@ int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallbac read_info.io_desc_ = io_info.io_desc_; read_info.macro_block_id_ = io_info.macro_block_id_; read_info.io_timeout_ms_ = io_info.io_timeout_ms_; - read_info.io_callback_ = &callback; + read_info.io_callback_ = callback; read_info.offset_ = io_info.offset_; read_info.size_ = io_info.size_; read_info.io_desc_.set_group_id(ObIOModule::TMP_PAGE_CACHE_IO); diff --git a/src/storage/blocksstable/ob_tmp_file_cache.h b/src/storage/blocksstable/ob_tmp_file_cache.h index 2cd4ffa9c..fec6283bf 100644 --- a/src/storage/blocksstable/ob_tmp_file_cache.h +++ b/src/storage/blocksstable/ob_tmp_file_cache.h @@ -104,6 +104,7 @@ public: typedef common::ObKVCache BasePageCache; static ObTmpPageCache &get_instance(); int init(const char *cache_name, const int64_t priority); + int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle); int prefetch( const ObTmpPageCacheKey &key, const ObTmpBlockIOInfo &info, @@ -142,7 +143,7 @@ public: { public: ObTmpPageIOCallback(); - ~ObTmpPageIOCallback(); + ~ObTmpPageIOCallback() override; int64_t size() const override; int inner_process(const char *data_buffer, const int64_t size) override; const char *get_data() override; @@ -156,7 +157,7 @@ public: { public: ObTmpMultiPageIOCallback(); - ~ObTmpMultiPageIOCallback(); + ~ObTmpMultiPageIOCallback() override; int64_t size() const override; int inner_process(const char *data_buffer, const int64_t size) override; const char *get_data() override; @@ -169,8 +170,12 @@ public: private: ObTmpPageCache(); ~ObTmpPageCache(); - int read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback &callback, - ObMacroBlockHandle &handle); + int inner_read_io(const ObTmpBlockIOInfo &io_info, + ObITmpPageIOCallback *callback, + ObMacroBlockHandle &handle); + int read_io(const ObTmpBlockIOInfo &io_info, + ObITmpPageIOCallback *callback, + ObMacroBlockHandle &handle); private: DISALLOW_COPY_AND_ASSIGN(ObTmpPageCache); diff --git a/src/storage/blocksstable/ob_tmp_file_store.cpp b/src/storage/blocksstable/ob_tmp_file_store.cpp index 115aa6e85..9d01036b4 100644 --- a/src/storage/blocksstable/ob_tmp_file_store.cpp +++ b/src/storage/blocksstable/ob_tmp_file_store.cpp @@ -1347,7 +1347,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io } if (OB_SUCC(ret)) { - if (page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) { + if (!handle.is_disable_page_cache() && page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) { // merge multi page io into one. ObMacroBlockHandle mb_handle; ObTmpBlockIOInfo info(io_info); @@ -1375,9 +1375,16 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io info.offset_ += ObTmpMacroBlock::get_header_padding(); info.size_ = ObTmpMacroBlock::get_default_page_size(); info.macro_block_id_ = block->get_macro_block_id(); - if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle, io_allocator_))) { - STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret)); + if (handle.is_disable_page_cache()) { + if (OB_FAIL(page_cache_->direct_read(info, mb_handle))) { + STORAGE_LOG(WARN, "fail to direct read tmp page", K(ret)); + } } else { + if (OB_FAIL(page_cache_->prefetch(page_io_infos->at(i).key_, info, mb_handle, io_allocator_))) { + STORAGE_LOG(WARN, "fail to prefetch tmp page", K(ret)); + } + } + if (OB_SUCC(ret)) { char *buf = io_info.buf_ + ObTmpMacroBlock::calculate_offset( page_io_infos->at(i).key_.get_page_id(), page_io_infos->at(i).offset_) - io_info.offset_; ObTmpFileIOHandle::ObIOReadHandle read_handle(mb_handle, buf, page_io_infos->at(i).offset_, diff --git a/unittest/storage/blocksstable/test_tmp_file.cpp b/unittest/storage/blocksstable/test_tmp_file.cpp index b90a327ef..14efdb716 100644 --- a/unittest/storage/blocksstable/test_tmp_file.cpp +++ b/unittest/storage/blocksstable/test_tmp_file.cpp @@ -724,6 +724,85 @@ TEST_F(TestTmpFile, test_big_file) ObTmpFileManager::get_instance().remove(fd); } +TEST_F(TestTmpFile, test_big_file_disable_page_cache) +{ + int ret = OB_SUCCESS; + int64_t dir = -1; + int64_t fd = -1; + const int64_t macro_block_size = OB_SERVER_BLOCK_MGR.get_macro_block_size(); + ObTmpFileIOInfo io_info; + ObTmpFileIOHandle handle; + ret = ObTmpFileManager::get_instance().alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = ObTmpFileManager::get_instance().open(fd, dir); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t write_size = macro_block_size * 512; + char *write_buf = (char *)malloc(write_size); + for (int64_t i = 0; i < write_size; ++i) { + write_buf[i] = static_cast(i % 256); + } + char *read_buf = (char *)malloc(write_size); + io_info.fd_ = fd; + io_info.tenant_id_ = 1; + io_info.io_desc_.set_wait_event(2); + io_info.buf_ = write_buf; + io_info.size_ = write_size; + io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; + io_info.disable_page_cache_ = true; + int64_t write_time = ObTimeUtility::current_time(); + ret = ObTmpFileManager::get_instance().write(io_info); + ASSERT_EQ(OB_SUCCESS, ret); + write_time = ObTimeUtility::current_time() - write_time; + io_info.buf_ = read_buf; + + io_info.size_ = write_size; + ret = ObTmpFileManager::get_instance().aio_read(io_info, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_TRUE(handle.size_ < handle.expect_read_size_); + ASSERT_EQ(OB_SUCCESS, handle.wait()); + ASSERT_EQ(write_size, handle.get_data_size()); + int cmp = memcmp(handle.get_buffer(), write_buf, handle.get_data_size()); + ASSERT_EQ(0, cmp); + + io_info.size_ = macro_block_size; + ret = ObTmpFileManager::get_instance().pread(io_info, 100, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(macro_block_size, handle.get_data_size()); + cmp = memcmp(handle.get_buffer(), write_buf + 100, handle.get_data_size()); + ASSERT_EQ(0, cmp); + + io_info.size_ = write_size; + int64_t read_time = ObTimeUtility::current_time(); + ret = ObTmpFileManager::get_instance().pread(io_info, 0, handle); + read_time = ObTimeUtility::current_time() - read_time; + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(write_size, handle.get_data_size()); + cmp = memcmp(handle.get_buffer(), write_buf, write_size); + ASSERT_EQ(0, cmp); + + io_info.size_ = 200; + ret = ObTmpFileManager::get_instance().pread(io_info, 200, handle); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(200, handle.get_data_size()); + cmp = memcmp(handle.get_buffer(), write_buf + 200, 200); + ASSERT_EQ(0, cmp); + + free(write_buf); + free(read_buf); + + STORAGE_LOG(INFO, "test_big_file"); + STORAGE_LOG(INFO, "io time", K(write_time), K(read_time)); + ObTmpTenantFileStoreHandle store_handle; + OB_TMP_FILE_STORE.get_store(1, store_handle); + store_handle.get_tenant_store()->print_block_usage(); + ObMallocAllocator::get_instance()->print_tenant_memory_usage(1); + ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(1); + ObMallocAllocator::get_instance()->print_tenant_memory_usage(500); + ObMallocAllocator::get_instance()->print_tenant_ctx_memory_usage(500); + + ObTmpFileManager::get_instance().remove(fd); +} + TEST_F(TestTmpFile, test_multi_small_file_single_thread_read_write) { int ret = OB_SUCCESS;