From 9b2deedb61a7aeac643827d3dcac48844b43f512 Mon Sep 17 00:00:00 2001 From: obdev Date: Mon, 26 Aug 2024 07:26:48 +0000 Subject: [PATCH] If the tmp file is deleted, the flush task will not be retried; delete the tmp file as soon as possible; --- .../tmp_file/ob_tmp_file_flush_ctx.cpp | 4 ++- src/storage/tmp_file/ob_tmp_file_flush_ctx.h | 1 + .../tmp_file/ob_tmp_file_flush_manager.cpp | 29 +++++++++++++-- .../tmp_file/ob_tmp_file_flush_manager.h | 1 + .../tmp_file/ob_tmp_file_thread_wrapper.cpp | 35 ++++++++++++------- .../tmp_file/ob_tmp_file_thread_wrapper.h | 1 + 6 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp index cfd2cac01..1586d5371 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp @@ -197,7 +197,9 @@ void ObTmpFileBatchFlushContext::try_update_prepare_finished_cnt(const ObTmpFile if (OB_UNLIKELY(flush_seq_ctx_.prepare_finished_cnt_ >= flush_seq_ctx_.create_flush_task_cnt_)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("unexpected flush_seq_ctx_", KPC(this)); - } else if (ObTmpFileFlushTask::TFFT_WAIT == flush_task.get_state() || flush_task.get_data_length() == 0) { + } else if (ObTmpFileFlushTask::TFFT_WAIT == flush_task.get_state() || + ObTmpFileFlushTask::TFFT_ABORT == flush_task.get_state() || + flush_task.get_data_length() == 0) { ++flush_seq_ctx_.prepare_finished_cnt_; recorded = true; } diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h index 259379fac..13041fb46 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h @@ -240,6 +240,7 @@ public: TFFT_ASYNC_WRITE = 5, TFFT_WAIT = 6, TFFT_FINISH = 7, + TFFT_ABORT = 8, }; public: void destroy(); diff --git a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp index 2ae360ecd..e8f21780d 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp @@ -650,6 +650,27 @@ int ObTmpFileFlushManager::drive_flush_task_prepare_(ObTmpFileFlushTask &flush_t return ret; } +void ObTmpFileFlushManager::try_remove_unused_flush_info_(ObTmpFileFlushTask &flush_task) +{ + int ret = OB_SUCCESS; + + ObArray &flush_infos = flush_task.get_flush_infos(); + for (int64_t i = 0; OB_SUCC(ret) && i >= 0 && i < flush_infos.count(); ++i) { + ObTmpFileFlushInfo &flush_info = flush_infos.at(i); + ObSharedNothingTmpFile *file = flush_info.file_handle_.get(); + if (OB_ISNULL(file)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(WARN, "file is nullptr", KR(ret), K(flush_info)); + } else if (file->is_deleting()) { + STORAGE_LOG(INFO, "the file is deleting, abort this flush info", + KR(ret), K(flush_info), K(flush_task)); + flush_info.reset(); + flush_infos.remove(i); + --i; + } + } +} + int ObTmpFileFlushManager::drive_flush_task_retry_( ObTmpFileFlushTask &flush_task, const FlushState state, @@ -664,7 +685,11 @@ int ObTmpFileFlushManager::drive_flush_task_retry_( } break; case FlushState::TFFT_ASYNC_WRITE: - if (OB_FAIL(handle_async_write_(flush_task, next_state))) { + try_remove_unused_flush_info_(flush_task); + if (0 == flush_task.get_flush_infos().count()) { + STORAGE_LOG(INFO, "all flush info is aborted", KR(ret), K(flush_task)); + next_state = FlushState::TFFT_ABORT; + } else if (OB_FAIL(handle_async_write_(flush_task, next_state))) { STORAGE_LOG(WARN, "fail to handle flush task async write", KR(ret), K(flush_task)); } break; @@ -710,7 +735,7 @@ int ObTmpFileFlushManager::retry(ObTmpFileFlushTask &flush_task) } else if (OB_FAIL(advance_status_(flush_task, next_state))) { STORAGE_LOG(WARN, "fail to advance status", KR(ret), K(state), K(next_state), K(flush_task)); } - } while (OB_SUCC(ret) && FlushState::TFFT_WAIT != next_state); + } while (OB_SUCC(ret) && FlushState::TFFT_WAIT != next_state && FlushState::TFFT_ABORT != next_state); if (!flush_task.get_recorded_as_prepare_finished()) { if (flush_task.get_flush_seq() != flush_ctx_.get_flush_sequence()) { diff --git a/src/storage/tmp_file/ob_tmp_file_flush_manager.h b/src/storage/tmp_file/ob_tmp_file_flush_manager.h index 87e1383eb..d52b0407a 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_manager.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_manager.h @@ -110,6 +110,7 @@ private: int evict_pages_and_retry_insert_(ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &flush_info, const int64_t logic_block_index); + void try_remove_unused_flush_info_(ObTmpFileFlushTask &flush_task); DISALLOW_COPY_AND_ASSIGN(ObTmpFileFlushManager); private: bool is_inited_; diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp index d08c739e9..e082a6f29 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp @@ -382,7 +382,10 @@ int ObTmpFileFlushTG::retry_task_() } // push task into wait_list_/retry_list_ according to task state, ignore error code FlushState state = flush_task->get_state(); - if (FlushState::TFFT_WAIT == state) { + if (FlushState::TFFT_ABORT == state) { + STORAGE_LOG(INFO, "free abort flush task", KPC(flush_task)); + flush_task_finished_(flush_task); + } else if (FlushState::TFFT_WAIT == state) { push_wait_list_(flush_task); } else if (FlushState::TFFT_FILL_BLOCK_BUF < state) { push_retry_list_(flush_task); @@ -455,29 +458,37 @@ int ObTmpFileFlushTG::check_flush_task_io_finished_() ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "flush task is nullptr", KR(ret)); } else { - bool is_flush_tree_page = flush_task->get_is_fast_flush_tree(); STORAGE_LOG(DEBUG, "flush task io complete", K(flushing_block_num_), KPC(flush_task)); // if the update fails, it will be retried during the next wakeup if (OB_FAIL(flush_mgr_.update_file_meta_after_flush(*flush_task))) { STORAGE_LOG(WARN, "fail to drive flush state machine", KR(ret), KPC(flush_task)); push_finished_list_(flush_task); } else { - flush_mgr_.free_flush_task(flush_task); - ATOMIC_DEC(&flushing_block_num_); - fast_flush_meta_task_cnt_ -= is_flush_tree_page ? 1 : 0; - if (fast_flush_meta_task_cnt_ == 0) { - // reset is_fast_flush_meta_ flag to resume retry task and flush - is_fast_flush_meta_ = false; - } else if (OB_UNLIKELY(fast_flush_meta_task_cnt_ < 0)) { - STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KPC(this)); - } - signal_io_finish(); + flush_task_finished_(flush_task); } } } return ret; } +void ObTmpFileFlushTG::flush_task_finished_(ObTmpFileFlushTask *flush_task) +{ + int ret = OB_SUCCESS; + + bool is_flush_tree_page = flush_task->get_is_fast_flush_tree(); + flush_mgr_.free_flush_task(flush_task); + ATOMIC_DEC(&flushing_block_num_); + fast_flush_meta_task_cnt_ -= is_flush_tree_page ? 1 : 0; + if (fast_flush_meta_task_cnt_ == 0) { + // reset is_fast_flush_meta_ flag to resume retry task and flush + is_fast_flush_meta_ = false; + } else if (OB_UNLIKELY(fast_flush_meta_task_cnt_ < 0)) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KR(ret), KPC(this)); + } + signal_io_finish(); +} + int ObTmpFileFlushTG::push_wait_list_(ObTmpFileFlushTask *flush_task) { int ret = OB_SUCCESS; diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h index 79fe3fbae..1afedd17c 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h @@ -74,6 +74,7 @@ private: int pop_retry_list_(ObTmpFileFlushTask *&flush_task); int push_finished_list_(ObTmpFileFlushTask *flush_task); int pop_finished_list_(ObTmpFileFlushTask *&flush_task); + void flush_task_finished_(ObTmpFileFlushTask *flush_task); private: bool is_inited_; RUNNING_MODE mode_;