diff --git a/src/storage/blocksstable/ob_tmp_file_cache.cpp b/src/storage/blocksstable/ob_tmp_file_cache.cpp index b4695727ae..72f6f9b3f4 100644 --- a/src/storage/blocksstable/ob_tmp_file_cache.cpp +++ b/src/storage/blocksstable/ob_tmp_file_cache.cpp @@ -147,11 +147,27 @@ int ObTmpPageCache::inner_read_io(const ObTmpBlockIOInfo &io_info, } int ObTmpPageCache::direct_read(const ObTmpBlockIOInfo &info, - ObMacroBlockHandle &mb_handle) + ObMacroBlockHandle &mb_handle, + common::ObIAllocator &allocator) { int ret = OB_SUCCESS; - if (OB_FAIL(inner_read_io(info, nullptr, mb_handle))) { - STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); + void *buf = nullptr; + ObTmpDirectReadPageIOCallback *callback = nullptr; + if (OB_ISNULL(buf = allocator.alloc(sizeof(ObTmpDirectReadPageIOCallback)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "allocate callback memory failed", K(ret)); + } else { + // fill the callback + callback = new (buf) ObTmpDirectReadPageIOCallback; + callback->cache_ = this; + callback->offset_ = info.offset_; + callback->allocator_ = &allocator; + if (OB_FAIL(inner_read_io(info, callback, mb_handle))) { + STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); + } + // There is no need to handle error cases (freeing the memory of the + // callback) because inner_read_io will handle error cases and free the + // memory of the callback. } return ret; } @@ -182,6 +198,9 @@ int ObTmpPageCache::prefetch( if (OB_FAIL(inner_read_io(info, callback, mb_handle))) { STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); } + // There is no need to handle error cases (freeing the memory of the + // callback) because inner_read_io will handle error cases and free the + // memory of the callback. } } return ret; @@ -213,6 +232,9 @@ int ObTmpPageCache::prefetch( } else if (OB_FAIL(inner_read_io(info, callback, mb_handle))) { STORAGE_LOG(WARN, "fail to inner read io", K(ret), K(mb_handle)); } + // There is no need to handle error cases (freeing the memory of the + // callback) because inner_read_io will handle error cases and free the + // memory of the callback. } } return ret; @@ -381,6 +403,37 @@ const char *ObTmpPageCache::ObTmpMultiPageIOCallback::get_data() return data_buf_; } +int64_t ObTmpPageCache::ObTmpDirectReadPageIOCallback::size() const +{ + return sizeof(*this); +} + +const char * ObTmpPageCache::ObTmpDirectReadPageIOCallback::get_data() +{ + return data_buf_; +} + +int ObTmpPageCache::ObTmpDirectReadPageIOCallback::inner_process(const char *data_buffer, const int64_t size) +{ + int ret = OB_SUCCESS; + ObTimeGuard time_guard("ObTmpDirectReadPageIOCallback", 100000); //100ms + if (OB_ISNULL(cache_) || OB_ISNULL(allocator_)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "Invalid tmp page cache callback allocator", KP_(cache), KP_(allocator), K(ret)); + } else if (OB_UNLIKELY(size <= 0 || data_buffer == nullptr)) { + ret = OB_INVALID_DATA; + STORAGE_LOG(WARN, "invalid data buffer size", K(ret), K(size), KP(data_buffer)); + } else if (OB_FAIL(alloc_data_buf(data_buffer, size))) { + STORAGE_LOG(WARN, "Fail to allocate memory, ", K(ret), K(size)); + } else if (FALSE_IT(time_guard.click("alloc_data_buf"))) { + } + if (OB_FAIL(ret) && NULL != allocator_ && NULL != data_buf_) { + allocator_->free(data_buf_); + data_buf_ = NULL; + } + return ret; +} + int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallback *callback, ObMacroBlockHandle &handle) { @@ -391,11 +444,7 @@ int ObTmpPageCache::read_io(const ObTmpBlockIOInfo &io_info, ObITmpPageIOCallbac read_info.io_desc_ = io_info.io_desc_; read_info.macro_block_id_ = io_info.macro_block_id_; read_info.io_timeout_ms_ = io_info.io_timeout_ms_; - if (callback == nullptr) { - read_info.buf_ = io_info.buf_; - } else { - read_info.io_callback_ = callback; - } + read_info.io_callback_ = callback; read_info.offset_ = io_info.offset_; read_info.size_ = io_info.size_; read_info.io_desc_.set_group_id(ObIOModule::TMP_PAGE_CACHE_IO); diff --git a/src/storage/blocksstable/ob_tmp_file_cache.h b/src/storage/blocksstable/ob_tmp_file_cache.h index fec6283bf3..3996e5ecbb 100644 --- a/src/storage/blocksstable/ob_tmp_file_cache.h +++ b/src/storage/blocksstable/ob_tmp_file_cache.h @@ -104,7 +104,7 @@ public: typedef common::ObKVCache BasePageCache; static ObTmpPageCache &get_instance(); int init(const char *cache_name, const int64_t priority); - int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle); + int direct_read(const ObTmpBlockIOInfo &info, ObMacroBlockHandle &mb_handle, common::ObIAllocator &allocator); int prefetch( const ObTmpPageCacheKey &key, const ObTmpBlockIOInfo &info, @@ -167,6 +167,17 @@ public: friend class ObTmpPageCache; common::ObArray page_io_infos_; }; + class ObTmpDirectReadPageIOCallback final : public ObITmpPageIOCallback + { + public: + ObTmpDirectReadPageIOCallback() {} + ~ObTmpDirectReadPageIOCallback() override {} + int64_t size() const override; + int inner_process(const char *data_buffer, const int64_t size) override; + const char *get_data() override; + TO_STRING_KV("callback_type:", "ObTmpDirectReadPageIOCallback", KP_(data_buf)); + DISALLOW_COPY_AND_ASSIGN(ObTmpDirectReadPageIOCallback); + }; private: ObTmpPageCache(); ~ObTmpPageCache(); diff --git a/src/storage/blocksstable/ob_tmp_file_store.cpp b/src/storage/blocksstable/ob_tmp_file_store.cpp index 9d01036b48..382a0e7d2b 100644 --- a/src/storage/blocksstable/ob_tmp_file_store.cpp +++ b/src/storage/blocksstable/ob_tmp_file_store.cpp @@ -1347,18 +1347,25 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io } if (OB_SUCC(ret)) { - if (!handle.is_disable_page_cache() && page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) { + if (page_io_infos->count() > DEFAULT_PAGE_IO_MERGE_RATIO * page_nums) { // merge multi page io into one. ObMacroBlockHandle mb_handle; ObTmpBlockIOInfo info(io_info); - int64_t p_offset = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size()); + const int64_t p_offset = common::lower_align(io_info.offset_, ObTmpMacroBlock::get_default_page_size()); // just skip header and padding. info.offset_ = p_offset + ObTmpMacroBlock::get_header_padding(); info.size_ = page_nums * ObTmpMacroBlock::get_default_page_size(); info.macro_block_id_ = block->get_macro_block_id(); - if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle, io_allocator_))) { - STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret)); + if (handle.is_disable_page_cache()) { + if (OB_FAIL(page_cache_->direct_read(info, mb_handle, io_allocator_))) { + STORAGE_LOG(WARN, "fail to direct read multi page", K(ret)); + } } else { + if (OB_FAIL(page_cache_->prefetch(info, *page_io_infos, mb_handle, io_allocator_))) { + STORAGE_LOG(WARN, "fail to prefetch multi tmp page", K(ret)); + } + } + if (OB_SUCC(ret)) { ObTmpFileIOHandle::ObIOReadHandle read_handle(mb_handle, io_info.buf_, io_info.offset_ - p_offset, io_info.size_); if (OB_FAIL(handle.get_io_handles().push_back(read_handle))) { @@ -1376,7 +1383,7 @@ int ObTmpTenantFileStore::read_page(ObTmpMacroBlock *block, ObTmpBlockIOInfo &io info.size_ = ObTmpMacroBlock::get_default_page_size(); info.macro_block_id_ = block->get_macro_block_id(); if (handle.is_disable_page_cache()) { - if (OB_FAIL(page_cache_->direct_read(info, mb_handle))) { + if (OB_FAIL(page_cache_->direct_read(info, mb_handle, io_allocator_))) { STORAGE_LOG(WARN, "fail to direct read tmp page", K(ret)); } } else { diff --git a/src/storage/blocksstable/ob_tmp_file_store.h b/src/storage/blocksstable/ob_tmp_file_store.h index 419d6a3f37..773403e586 100644 --- a/src/storage/blocksstable/ob_tmp_file_store.h +++ b/src/storage/blocksstable/ob_tmp_file_store.h @@ -111,7 +111,7 @@ struct ObTmpBlockIOInfo final public: ObTmpBlockIOInfo() : block_id_(0), offset_(0), size_(0), io_timeout_ms_(DEFAULT_IO_WAIT_TIME_MS), tenant_id_(0), - buf_(NULL), io_desc_(), macro_block_id_() {} + buf_(NULL), io_desc_(), macro_block_id_() {} ObTmpBlockIOInfo(const int64_t block_id, const int64_t offset, const int64_t size, const uint64_t tenant_id, const MacroBlockId macro_block_id, char *buf, const common::ObIOFlag io_desc) diff --git a/unittest/storage/blocksstable/test_tmp_file.cpp b/unittest/storage/blocksstable/test_tmp_file.cpp index e6a9695752..893147b97a 100644 --- a/unittest/storage/blocksstable/test_tmp_file.cpp +++ b/unittest/storage/blocksstable/test_tmp_file.cpp @@ -680,6 +680,7 @@ TEST_F(TestTmpFile, test_big_file) write_time = ObTimeUtility::current_time() - write_time; io_info.buf_ = read_buf; + // Flush all held block caches to ensure that subsequent read processes will go through I/O. ObKVGlobalCache::get_instance().erase_cache(1, "tmp_block_cache"); io_info.size_ = write_size; @@ -730,7 +731,6 @@ TEST_F(TestTmpFile, test_big_file) ObTmpFileManager::get_instance().remove(fd); } -/* TEST_F(TestTmpFile, test_big_file_disable_page_cache) { int ret = OB_SUCCESS; @@ -762,6 +762,7 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache) write_time = ObTimeUtility::current_time() - write_time; io_info.buf_ = read_buf; + // Flush all held block caches to ensure that subsequent read processes will go through I/O. ObKVGlobalCache::get_instance().erase_cache(1, "tmp_block_cache"); io_info.size_ = write_size; @@ -811,7 +812,6 @@ TEST_F(TestTmpFile, test_big_file_disable_page_cache) ObTmpFileManager::get_instance().remove(fd); } -*/ TEST_F(TestTmpFile, test_multi_small_file_single_thread_read_write) {