From 51783965996d3abf46e940f6f975d3317b8e6757 Mon Sep 17 00:00:00 2001 From: dongb0 <708848999@qq.com> Date: Tue, 20 Aug 2024 13:18:59 +0000 Subject: [PATCH] fix tmp file not insert into flush prio mgr when there are flush tasks not completed --- .../mtlenv/storage/tmp_file/test_tmp_file.cpp | 220 +++++++++++++++++- .../tmp_file/ob_shared_nothing_tmp_file.cpp | 24 +- .../tmp_file/ob_shared_nothing_tmp_file.h | 10 +- .../tmp_file/ob_tmp_file_flush_ctx.cpp | 5 +- src/storage/tmp_file/ob_tmp_file_flush_ctx.h | 3 +- .../tmp_file/ob_tmp_file_flush_manager.cpp | 2 +- 6 files changed, 245 insertions(+), 19 deletions(-) diff --git a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp index fb9137d0f0..267ef8d612 100644 --- a/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp +++ b/mittest/mtlenv/storage/tmp_file/test_tmp_file.cpp @@ -1008,6 +1008,201 @@ TEST_F(TestTmpFile, test_tmp_file_truncate) LOG_INFO("test_tmp_file_truncate"); } +TEST_F(TestTmpFile, test_truncate_to_flushed_page_id) +{ + int ret = OB_SUCCESS; + const int64_t write_size = 4 * 1024 * 1024 + 12 * 1024; + const int64_t wbp_mem_limit = MTL(ObTenantTmpFileManager *)->page_cache_controller_.write_buffer_pool_.get_memory_limit(); + char *write_buf = new char [write_size]; + for (int64_t i = 0; i < write_size;) { + int64_t random_length = generate_random_int(1024, 8 * 1024); + int64_t random_int = generate_random_int(0, 256); + for (int64_t j = 0; j < random_length && i + j < write_size; ++j) { + write_buf[i + j] = random_int; + } + i += random_length; + } + + int64_t dir = -1; + int64_t fd = -1; + ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = MTL(ObTenantTmpFileManager *)->open(fd, dir); + std::cout << "open temporary file: " << fd << std::endl; + ASSERT_EQ(OB_SUCCESS, ret); + ObTmpFileHandle file_handle; + ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle); + ASSERT_EQ(OB_SUCCESS, ret); + file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY; + + ObTmpFileIOInfo io_info; + io_info.fd_ = fd; + io_info.io_desc_.set_wait_event(2); + io_info.buf_ = write_buf; + io_info.size_ = write_size; + io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; + // Write data + ret = MTL(ObTenantTmpFileManager *)->write(io_info); + ASSERT_EQ(OB_SUCCESS, ret); + + sleep(2); // waits for flushing 4MB data pages, 12KB(2 pages) left + ret = MTL(ObTenantTmpFileManager *)->truncate(fd, 4 * 1024 * 1024); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(4 * 1024 * 1024, file_handle.get()->truncated_offset_); + + int64_t block_index = -1; + ret = MTL(ObTenantTmpFileManager *)->tmp_file_block_manager_.create_tmp_file_block(0/*begin_page_id*/, ObTmpFileGlobal::BLOCK_PAGE_NUMS, block_index); + int64_t flush_sequence = MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_mgr_.flush_ctx_.get_flush_sequence(); + ObTmpFileDataFlushContext data_flush_ctx; + ObTmpFileFlushTask flush_task; + flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo()); + flush_task.set_block_index(block_index); + ret = flush_task.prealloc_block_buf(); + EXPECT_EQ(OB_SUCCESS, ret); + // copy first page after flush and truncate + int64_t last_info_idx = flush_task.get_flush_infos().count() - 1; + ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx), + data_flush_ctx, flush_sequence, false/*flush tail*/); + ASSERT_EQ(OB_SUCCESS, ret); + LOG_INFO("checking flush task", K(flush_task)); + LOG_INFO("checking data flush ctx", K(data_flush_ctx)); + int64_t PAGE_SIZE = 8 * 1024; + EXPECT_EQ(PAGE_SIZE, flush_task.get_data_length()); + + // simulate we have push 1 task to TFFT_INSERT_META_TREE and release truncate lock + // so that another truncate operation can come in. + file_handle.get()->truncate_lock_.unlock(); + + // truncate again + int64_t truncate_offset = 4 * 1024 * 1024 + PAGE_SIZE + PAGE_SIZE / 2; + ret = MTL(ObTenantTmpFileManager *)->truncate(fd, truncate_offset); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(truncate_offset, file_handle.get()->truncated_offset_); + + // append 8KB, 12KB left in wbp + io_info.size_ = PAGE_SIZE; + ret = MTL(ObTenantTmpFileManager *)->write(io_info); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(4 * 1024 * 1024 + 20 * 1024, file_handle.get()->file_size_); + + // copy second page + flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo()); + last_info_idx = flush_task.get_flush_infos().count() - 1; + ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx), + data_flush_ctx, flush_sequence, false/*flush tail*/); + LOG_INFO("checking flush task", K(flush_task)); + LOG_INFO("checking data flush ctx", K(data_flush_ctx)); + EXPECT_EQ(OB_SUCCESS, ret); + EXPECT_EQ(PAGE_SIZE * 2, flush_task.get_data_length()); + + // copy third page + flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo()); + last_info_idx = flush_task.get_flush_infos().count() - 1; + ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx), + data_flush_ctx, flush_sequence, true/*flush tail*/); + LOG_INFO("checking flush task", K(flush_task)); + LOG_INFO("checking data flush ctx", K(data_flush_ctx)); + EXPECT_EQ(OB_SUCCESS, ret); + EXPECT_EQ(PAGE_SIZE * 3, flush_task.get_data_length()); + + int64_t first_virtual_page_id = flush_task.get_flush_infos().at(0).flush_virtual_page_id_; + int64_t second_virtual_page_id = flush_task.get_flush_infos().at(1).flush_virtual_page_id_; + int64_t third_virtual_page_id = flush_task.get_flush_infos().at(2).flush_virtual_page_id_; + EXPECT_EQ(first_virtual_page_id + 1, second_virtual_page_id); + EXPECT_EQ(second_virtual_page_id + 1, third_virtual_page_id); + + file_handle.reset(); + flush_task.~ObTmpFileFlushTask(); + free(write_buf); + + ret = MTL(ObTenantTmpFileManager *)->remove(fd); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_priority_mgr_.get_file_size()); + ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.evict_mgr_.get_file_size()); + + LOG_INFO("test_truncate_offset_and_flushed_page_id_defensive_check"); +} + +TEST_F(TestTmpFile, test_write_last_page_during_flush) +{ + int ret = OB_SUCCESS; + const int64_t write_size = 64 * 1024 + 100; + char *write_buf = new char [write_size]; + for (int64_t i = 0; i < write_size;) { + int64_t random_length = generate_random_int(1024, 8 * 1024); + int64_t random_int = generate_random_int(0, 256); + for (int64_t j = 0; j < random_length && i + j < write_size; ++j) { + write_buf[i + j] = random_int; + } + i += random_length; + } + + int64_t dir = -1; + int64_t fd = -1; + ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = MTL(ObTenantTmpFileManager *)->open(fd, dir); + std::cout << "open temporary file: " << fd << std::endl; + ASSERT_EQ(OB_SUCCESS, ret); + ObTmpFileHandle file_handle; + ret = MTL(ObTenantTmpFileManager *)->get_tmp_file(fd, file_handle); + ASSERT_EQ(OB_SUCCESS, ret); + file_handle.get()->page_idx_cache_.max_bucket_array_capacity_ = SMALL_WBP_IDX_CACHE_MAX_CAPACITY; + + ObTmpFileIOInfo io_info; + io_info.fd_ = fd; + io_info.io_desc_.set_wait_event(2); + io_info.buf_ = write_buf; + io_info.size_ = write_size; + io_info.io_timeout_ms_ = DEFAULT_IO_WAIT_TIME_MS; + + ret = MTL(ObTenantTmpFileManager *)->write(io_info); + ASSERT_EQ(OB_SUCCESS, ret); + + printf("generate_data_flush_info\n"); + // hard code generate_data_flush_info to flush last page + int64_t block_index = -1; + ret = MTL(ObTenantTmpFileManager *)->tmp_file_block_manager_.create_tmp_file_block(0/*begin_page_id*/, ObTmpFileGlobal::BLOCK_PAGE_NUMS, block_index); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t flush_sequence = MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_mgr_.flush_ctx_.get_flush_sequence(); + ObTmpFileDataFlushContext data_flush_ctx; + ObTmpFileFlushTask flush_task; + flush_task.get_flush_infos().push_back(ObTmpFileFlushInfo()); + flush_task.set_block_index(block_index); + ret = flush_task.prealloc_block_buf(); + EXPECT_EQ(OB_SUCCESS, ret); + // copy first page after flush and truncate + int64_t last_info_idx = flush_task.get_flush_infos().count() - 1; + ret = file_handle.get()->generate_data_flush_info(flush_task, flush_task.get_flush_infos().at(last_info_idx), + data_flush_ctx, flush_sequence, true/*flush tail*/); + ASSERT_EQ(OB_SUCCESS, ret); + + // insert_meta_tree + ret = file_handle.get()->insert_meta_tree_item(flush_task.get_flush_infos().at(0), block_index); + ASSERT_EQ(OB_SUCCESS, ret); + + // write before IO complete + ret = MTL(ObTenantTmpFileManager *)->write(io_info); + ASSERT_EQ(OB_SUCCESS, ret); + + // assume io complete, update file meta + bool reset_ctx = false; + ret = file_handle.get()->update_meta_after_flush(flush_task.get_flush_infos().at(0).batch_flush_idx_, false/*is_meta*/, reset_ctx); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(8, file_handle.get()->flushed_data_page_num_); + ASSERT_EQ(0, file_handle.get()->write_back_data_page_num_); + + flush_task.~ObTmpFileFlushTask(); + file_handle.reset(); + ret = MTL(ObTenantTmpFileManager *)->remove(fd); + ASSERT_EQ(OB_SUCCESS, ret); + ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.flush_priority_mgr_.get_file_size()); + ASSERT_EQ(0, MTL(ObTenantTmpFileManager *)->page_cache_controller_.evict_mgr_.get_file_size()); + + delete [] write_buf; + LOG_INFO("test_write_last_page_during_flush"); +} + // generate 750MB random data. // this test will trigger flush and evict logic for both data and meta pages. void test_big_file(const int64_t write_size, const int64_t wbp_mem_limit, ObTmpFileIOInfo io_info) @@ -1380,7 +1575,7 @@ TEST_F(TestTmpFile, test_multi_file_multi_thread_read_write_with_block_cache) TEST_F(TestTmpFile, test_more_files_more_threads_read_write) { int ret = OB_SUCCESS; - const int64_t read_thread_cnt = 1; + const int64_t read_thread_cnt = 2; const int64_t file_cnt = 128; const int64_t batch_size = 3 * 1024 * 1024; const int64_t batch_num = 2; // total 128 * 3MB * 2 = 768MB @@ -1400,6 +1595,29 @@ TEST_F(TestTmpFile, test_more_files_more_threads_read_write) STORAGE_LOG(INFO, "io time", K(io_time)); } +TEST_F(TestTmpFile, test_multiple_small_files) +{ + int ret = OB_SUCCESS; + const int64_t read_thread_cnt = 2; + const int64_t file_cnt = 256; + const int64_t batch_size = 10 * 1024; // 10KB + const int64_t batch_num = 3; + const bool disable_block_cache = true; + TestMultiTmpFileStress test(MTL_CTX()); + int64_t dir = -1; + ret = MTL(ObTenantTmpFileManager *)->alloc_dir(dir); + ASSERT_EQ(OB_SUCCESS, ret); + ret = test.init(file_cnt, dir, read_thread_cnt, batch_size, batch_num, disable_block_cache); + ASSERT_EQ(OB_SUCCESS, ret); + int64_t io_time = ObTimeUtility::current_time(); + test.start(); + test.wait(); + io_time = ObTimeUtility::current_time() - io_time; + + STORAGE_LOG(INFO, "test_multiple_small_files"); + STORAGE_LOG(INFO, "io time", K(io_time)); +} + TEST_F(TestTmpFile, test_big_file) { const int64_t write_size = 750 * 1024 * 1024; // write 750MB data diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp index 76edc77eee..aed11d6365 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp @@ -900,9 +900,7 @@ int ObSharedNothingTmpFile::aio_write(ObTmpFileIOCtx &io_ctx) if (OB_FAIL(inner_write_(io_ctx))) { if (OB_ALLOCATE_TMP_FILE_PAGE_FAILED == ret) { ret = OB_SUCCESS; - if (TC_REACH_COUNT_INTERVAL(10)) { - LOG_INFO("alloc mem failed, try to evict pages", K(fd_), K(file_size_), K(io_ctx)); - } + LOG_INFO("alloc mem failed, try to evict pages", K(fd_), K(file_size_), K(io_ctx), KPC(this)); if (OB_FAIL(page_cache_controller_->invoke_swap_and_wait( MIN(io_ctx.get_todo_size(), ObTmpFileGlobal::TMP_FILE_WRITE_BATCH_PAGE_NUM * ObTmpFileGlobal::PAGE_SIZE), io_ctx.get_io_timeout_ms()))) { @@ -1998,12 +1996,20 @@ int ObSharedNothingTmpFile::update_meta_after_flush(const int64_t info_idx, cons if (FAILEDx(inner_flush_ctx_.update_finished_continuous_flush_info_num(is_meta, end_pos))) { LOG_WARN("fail to update finished continuous flush info num", KR(ret), K(start_pos), K(end_pos), KPC(this)); - } else if (inner_flush_ctx_.is_all_finished()) { - if (OB_ISNULL(data_flush_node_.get_next()) && OB_FAIL(reinsert_flush_node_(false /*is_meta*/))) { - LOG_ERROR("fail to reinsert data flush node", KR(ret), K(is_meta), K(inner_flush_ctx_), KPC(this)); - } else if (OB_ISNULL(meta_flush_node_.get_next()) && OB_FAIL(reinsert_flush_node_(true /*is_meta*/))) { - LOG_ERROR("fail to reinsert meta flush node", KR(ret), K(is_meta), K(inner_flush_ctx_), KPC(this)); - } else { + } else { + int tmp_ret = OB_SUCCESS; + if (inner_flush_ctx_.is_data_finished()) { + if (OB_ISNULL(data_flush_node_.get_next()) && OB_TMP_FAIL(reinsert_flush_node_(false /*is_meta*/))) { + LOG_ERROR("fail to reinsert data flush node", KR(tmp_ret), K(is_meta), K(inner_flush_ctx_), KPC(this)); + } + } + if (inner_flush_ctx_.is_meta_finished()) { + if (OB_ISNULL(meta_flush_node_.get_next()) && OB_TMP_FAIL(reinsert_flush_node_(true /*is_meta*/))) { + LOG_ERROR("fail to reinsert meta flush node", KR(tmp_ret), K(is_meta), K(inner_flush_ctx_), KPC(this)); + } + } + + if (inner_flush_ctx_.is_all_finished()) { inner_flush_ctx_.reset(); } } diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h index 03d05e1b93..441295dbba 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h @@ -85,10 +85,14 @@ public: } void reset(); int update_finished_continuous_flush_info_num(const bool is_meta, const int64_t end_pos); - bool is_all_finished() const + bool is_all_finished() const { return is_data_finished() && is_meta_finished(); } + bool is_data_finished() const { - return data_flush_infos_.size() == data_finished_continuous_flush_info_num_ && - meta_flush_infos_.size() == meta_finished_continuous_flush_info_num_; + return data_flush_infos_.size() == data_finished_continuous_flush_info_num_; + } + bool is_meta_finished() const + { + return meta_flush_infos_.size() == meta_finished_continuous_flush_info_num_; } bool is_flushing() const { diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp index b2c759e3c8..4aa2582779 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp @@ -218,7 +218,7 @@ void ObTmpFileBatchFlushContext::record_flush_task(const int64_t data_length) // -------------- ObTmpFileFlushTask --------------- // -ObTmpFileFlushTask::ObTmpFileFlushTask(ObIAllocator &task_allocator) +ObTmpFileFlushTask::ObTmpFileFlushTask() : inst_handle_(), kvpair_(nullptr), block_handle_(), @@ -232,8 +232,7 @@ ObTmpFileFlushTask::ObTmpFileFlushTask(ObIAllocator &task_allocator) task_state_(ObTmpFileFlushTaskState::TFFT_INITED), tmp_file_block_handle_(), handle_(), - flush_infos_(), - task_allocator_(task_allocator) + flush_infos_() { flush_infos_.set_attr(ObMemAttr(MTL_ID(), "TFFlushInfos")); } diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h index ad7a8c8ca3..4c97264139 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h @@ -227,7 +227,7 @@ public: struct ObTmpFileFlushTask : public common::ObSpLinkQueue::Link { public: - ObTmpFileFlushTask(ObIAllocator &task_allocator); + ObTmpFileFlushTask(); ~ObTmpFileFlushTask() { destroy(); } enum ObTmpFileFlushTaskState { @@ -297,7 +297,6 @@ private: ObTmpFileBlockHandle tmp_file_block_handle_;// hold a reference to the corresponding tmp file block to prevent it from being released blocksstable::ObMacroBlockHandle handle_; ObArray flush_infos_; // multi file flush into one block if size > 0 - ObIAllocator &task_allocator_; // ref to ObTmpFilePageCacheController::task_allocator_, used to free data_buf_ }; } // end namespace tmp_file diff --git a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp index d98eecf5f7..2321df3ea4 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp @@ -65,7 +65,7 @@ int ObTmpFileFlushManager::alloc_flush_task(ObTmpFileFlushTask *&flush_task) ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "fail to allocate memory for flush callback", KR(ret)); } else { - flush_task = new (task_buf) ObTmpFileFlushTask(task_allocator_); + flush_task = new (task_buf) ObTmpFileFlushTask(); } return ret; }