diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp index 1586d53717..8645623766 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp @@ -233,6 +233,7 @@ ObTmpFileFlushTask::ObTmpFileFlushTask() : inst_handle_(), kvpair_(nullptr), block_handle_(), + write_block_ret_code_(OB_SUCCESS), ret_code_(OB_SUCCESS), data_length_(0), block_index_(-1), @@ -254,6 +255,7 @@ void ObTmpFileFlushTask::destroy() block_handle_.reset(); inst_handle_.reset(); kvpair_ = nullptr; + write_block_ret_code_ = OB_SUCCESS; ret_code_ = OB_SUCCESS; data_length_ = 0; block_index_ = -1; @@ -298,6 +300,8 @@ int ObTmpFileFlushTask::write_one_block() true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect LOG_WARN("failed to update write time", KR(ret), K(handle_)); } + atomic_set_write_block_ret_code(ret); + return ret; } diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h index 13041fb469..c48c02b9c6 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h @@ -256,6 +256,12 @@ public: OB_INLINE char *get_data_buf() const { return block_handle_.value_ == nullptr ? nullptr : block_handle_.value_->get_buffer(); } OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&ret_code_, ret_code); } OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&ret_code_); } + OB_INLINE void atomic_set_write_block_ret_code(int write_block_ret_code) { + ATOMIC_SET(&write_block_ret_code_, write_block_ret_code); + } + OB_INLINE int atomic_get_write_block_ret_code() const { + return ATOMIC_LOAD(&write_block_ret_code_); + } OB_INLINE void set_data_length(const int64_t len) { data_length_ = len; } OB_INLINE int64_t get_data_length() const { return data_length_; } OB_INLINE void set_block_index(const int64_t block_index) { block_index_ = block_index; } @@ -283,13 +289,14 @@ public: return buffer != nullptr && get_data_buf() != nullptr && buffer >= get_data_buf() && buffer + length <= get_data_buf() + OB_SERVER_BLOCK_MGR.get_macro_block_size(); } - TO_STRING_KV(KP(this), KP(kvpair_), K(ret_code_), K(data_length_), + TO_STRING_KV(KP(this), KP(kvpair_), K(write_block_ret_code_), K(ret_code_), K(data_length_), K(block_index_), K(flush_seq_), K(create_ts_), K(is_io_finished_), K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_)); private: ObKVCacheInstHandle inst_handle_; ObKVCachePair *kvpair_; ObTmpBlockValueHandle block_handle_; + int write_block_ret_code_; int ret_code_; int64_t data_length_; // data length (including padding to make length upper align to page size) int64_t block_index_; // tmp file block logical index in ObTmpFileBlockManager diff --git a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp index 802c1633e6..1f09c4ea1c 100644 --- a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp +++ b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp @@ -147,11 +147,15 @@ int ObTmpFilePageCacheController::invoke_swap_and_wait(int64_t expect_swap_size, } if (OB_NOT_NULL(swap_job)) { + if (OB_SUCCESS != swap_job->get_ret_code()) { + ret = swap_job->get_ret_code(); + } // reset swap job to set is_finished to false in case of failure to push into queue: // otherwise job is not finished, but it will not be executed, so it will never become finished. swap_job->reset(); - if (OB_FAIL(free_swap_job_(swap_job))) { - STORAGE_LOG(ERROR, "fail to free swap job", KR(ret)); + int tmp_ret = OB_SUCCESS; + if (OB_TMP_FAIL(free_swap_job_(swap_job))) { + STORAGE_LOG(ERROR, "fail to free swap job", KR(ret), KR(tmp_ret)); } } return ret; diff --git a/src/storage/tmp_file/ob_tmp_file_thread_job.cpp b/src/storage/tmp_file/ob_tmp_file_thread_job.cpp index 7c7d3c2916..1a961daacc 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_job.cpp +++ b/src/storage/tmp_file/ob_tmp_file_thread_job.cpp @@ -33,6 +33,7 @@ int ObTmpFileSwapJob::init(int64_t expect_swap_size, uint32_t timeout_ms) STORAGE_LOG(WARN, "ObTmpFileSwapJob init cond failed", KR(ret)); } else { is_inited_ = true; + ret_code_ = OB_SUCCESS; is_finished_ = false; expect_swap_size_ = expect_swap_size; timeout_ms_ = timeout_ms; @@ -45,6 +46,7 @@ int ObTmpFileSwapJob::init(int64_t expect_swap_size, uint32_t timeout_ms) void ObTmpFileSwapJob::reset() { is_inited_ = false; + ret_code_ = OB_SUCCESS; is_finished_ = false; expect_swap_size_ = 0; timeout_ms_ = DEFAULT_TIMEOUT_MS; @@ -78,7 +80,7 @@ int ObTmpFileSwapJob::wait_swap_complete() } // set swap job is_finished, wake up threads that invoke swap job -int ObTmpFileSwapJob::signal_swap_complete() +int ObTmpFileSwapJob::signal_swap_complete(int ret_code) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -86,6 +88,7 @@ int ObTmpFileSwapJob::signal_swap_complete() STORAGE_LOG(WARN, "ObTmpFileSwapJob not init", KR(ret)); } else { ObThreadCondGuard guard(swap_cond_); + ATOMIC_SET(&ret_code_, ret_code); ATOMIC_SET(&is_finished_, true); if (OB_FAIL(swap_cond_.signal())) { STORAGE_LOG(WARN, "ObTmpFileSwapJob signal swap complete failed", KR(ret)); diff --git a/src/storage/tmp_file/ob_tmp_file_thread_job.h b/src/storage/tmp_file/ob_tmp_file_thread_job.h index 24a92914b1..0145898bed 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_job.h +++ b/src/storage/tmp_file/ob_tmp_file_thread_job.h @@ -29,6 +29,7 @@ public: static const uint32_t DEFAULT_TIMEOUT_MS = 10 * 1000; ObTmpFileSwapJob() : is_inited_(false), + ret_code_(OB_SUCCESS), is_finished_(false), timeout_ms_(DEFAULT_TIMEOUT_MS), create_ts_(0), @@ -39,16 +40,18 @@ public: int init(int64_t expect_swap_size, uint32_t timeout_ms = DEFAULT_TIMEOUT_MS); void reset(); int wait_swap_complete(); - int signal_swap_complete(); + int signal_swap_complete(int ret_code); OB_INLINE int64_t get_create_ts() const { return create_ts_; } OB_INLINE int64_t get_abs_timeout_ts() const { return abs_timeout_ts_; } OB_INLINE int64_t get_expect_swap_size() const { return expect_swap_size_; } OB_INLINE bool is_valid() { return ATOMIC_LOAD(&is_inited_) && swap_cond_.is_inited(); } OB_INLINE bool is_finished() const { return ATOMIC_LOAD(&is_finished_); } OB_INLINE bool is_inited() const { return ATOMIC_LOAD(&is_inited_); } + OB_INLINE int get_ret_code() const { return ATOMIC_LOAD(&ret_code_); } TO_STRING_KV(KP(this), K(is_inited_), K(is_finished_), K(create_ts_), K(timeout_ms_), K(abs_timeout_ts_), K(expect_swap_size_)); private: bool is_inited_; + int ret_code_; bool is_finished_; uint32_t timeout_ms_; int64_t create_ts_; diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp index e082a6f29a..28ea22fb93 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp @@ -31,6 +31,7 @@ ObTmpFileFlushTG::ObTmpFileFlushTG( : is_inited_(false), mode_(RUNNING_MODE::INVALID), last_flush_timestamp_(0), + flush_io_finished_ret_(OB_SUCCESS), flush_io_finished_round_(0), flushing_block_num_(0), is_fast_flush_meta_(false), @@ -62,6 +63,7 @@ int ObTmpFileFlushTG::init() is_inited_ = true; mode_ = RUNNING_MODE::NORMAL; last_flush_timestamp_ = 0; + flush_io_finished_ret_ = OB_SUCCESS; flush_io_finished_round_ = 0; flushing_block_num_ = 0; @@ -84,6 +86,7 @@ void ObTmpFileFlushTG::destroy() clean_up_lists(); mode_ = RUNNING_MODE::INVALID; last_flush_timestamp_ = 0; + flush_io_finished_ret_ = OB_SUCCESS; flush_io_finished_round_ = 0; flushing_block_num_ = 0; @@ -168,8 +171,9 @@ void ObTmpFileFlushTG::notify_doing_flush() last_flush_timestamp_ = 0; } -void ObTmpFileFlushTG::signal_io_finish() +void ObTmpFileFlushTG::signal_io_finish(int flush_io_finished_ret) { + flush_io_finished_ret_ = flush_io_finished_ret; ++flush_io_finished_round_; } @@ -178,6 +182,11 @@ int64_t ObTmpFileFlushTG::get_flush_io_finished_round() return flush_io_finished_round_; } +int64_t ObTmpFileFlushTG::get_flush_io_finished_ret() +{ + return flush_io_finished_ret_; +} + int64_t ObTmpFileFlushTG::cal_idle_time() { int64_t idle_time = 0; @@ -320,6 +329,10 @@ int ObTmpFileFlushTG::handle_generated_flush_tasks_(ObSpLinkQueue &flushing_list ret = OB_ERR_UNEXPECTED; LOG_ERROR("fail to switch state after data copied from tmp file", KR(ret), KPC(flush_task)); } else if (FlushState::TFFT_FILL_BLOCK_BUF < state) { + if (FlushState::TFFT_ASYNC_WRITE == state && + flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) { + signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE); + } push_retry_list_(flush_task); ATOMIC_INC(&flushing_block_num_); STORAGE_LOG(DEBUG, "push flush task to retry list", KPC(flush_task)); @@ -349,7 +362,7 @@ int ObTmpFileFlushTG::wash_(const int64_t expect_flush_size, const RUNNING_MODE bool idle_loop = flushing_task_cnt == 0; if (idle_loop && wbp_.get_dirty_page_percentage() < ObTmpFileFlushManager::FLUSH_WATERMARK_F5) { - signal_io_finish(); + signal_io_finish(OB_SUCCESS); } if (RUNNING_MODE::NORMAL == mode) { @@ -388,6 +401,10 @@ int ObTmpFileFlushTG::retry_task_() } else if (FlushState::TFFT_WAIT == state) { push_wait_list_(flush_task); } else if (FlushState::TFFT_FILL_BLOCK_BUF < state) { + if (FlushState::TFFT_ASYNC_WRITE == state && + flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) { + signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE); + } push_retry_list_(flush_task); if (FlushState::TFFT_INSERT_META_TREE == state && OB_ALLOCATE_TMP_FILE_PAGE_FAILED == ret) { STORAGE_LOG(WARN, "fail to retry insert meta item in TFFT_INSERT_META_TREE", KPC(flush_task)); @@ -486,7 +503,7 @@ void ObTmpFileFlushTG::flush_task_finished_(ObTmpFileFlushTask *flush_task) ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "fast_flush_meta_task_cnt_ is negative", KR(ret), KPC(this)); } - signal_io_finish(); + signal_io_finish(OB_SUCCESS); } int ObTmpFileFlushTG::push_wait_list_(ObTmpFileFlushTask *flush_task) @@ -786,7 +803,7 @@ void ObTmpFileSwapTG::clean_up_lists_() ObTmpFileSwapJob *swap_job = nullptr; if (OB_FAIL(swap_job_dequeue(swap_job))) { STORAGE_LOG(WARN, "fail dequeue swap job or swap job is nullptr", KR(ret), KP(swap_job)); - } else if (OB_FAIL(swap_job->signal_swap_complete())){ + } else if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))){ STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret)); } } @@ -796,7 +813,7 @@ void ObTmpFileSwapTG::clean_up_lists_() ObTmpFileSwapJob *swap_job = nullptr; if (OB_FAIL(pop_working_job_(swap_job))) { STORAGE_LOG(WARN, "fail to pop working job or ptr is null", KR(ret), KP(swap_job)); - } else if (OB_FAIL(swap_job->signal_swap_complete())){ + } else if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))){ STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret)); } } @@ -898,6 +915,10 @@ int ObTmpFileSwapTG::swap_fast_() int64_t wakeup_job_cnt = 0; wakeup_satisfied_jobs_(wakeup_job_cnt); wakeup_timeout_jobs_(); + int io_finished_ret = flush_tg_ref_.get_flush_io_finished_ret(); + if (OB_SERVER_OUTOF_DISK_SPACE == io_finished_ret) { + wakeup_all_jobs_(OB_SERVER_OUTOF_DISK_SPACE); + } // do flush if could not evict enough pages if (OB_SUCC(ret) && !working_list_.is_empty() && wakeup_job_cnt < PROCCESS_JOB_NUM_PER_BATCH) { @@ -953,7 +974,7 @@ int ObTmpFileSwapTG::wakeup_satisfied_jobs_(int64_t& wakeup_job_cnt) wbp_free_page_cnt -= min(wbp_free_page_cnt, single_job_swap_page_cnt); int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts(); swap_monitor_.record_swap_response_time(response_time); - if (OB_FAIL(swap_job->signal_swap_complete())) { + if (OB_FAIL(swap_job->signal_swap_complete(OB_SUCCESS))) { STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KPC(swap_job)); } else { ++wakeup_job_cnt; @@ -974,7 +995,7 @@ int ObTmpFileSwapTG::wakeup_timeout_jobs_() // timeout, wake it up int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts(); swap_monitor_.record_swap_response_time(response_time); - if (OB_FAIL(swap_job->signal_swap_complete())) { + if (OB_FAIL(swap_job->signal_swap_complete(OB_TIMEOUT))) { STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KP(swap_job)); } } else { @@ -986,6 +1007,23 @@ int ObTmpFileSwapTG::wakeup_timeout_jobs_() return ret; } +void ObTmpFileSwapTG::wakeup_all_jobs_(int ret_code) +{ + int ret = OB_SUCCESS; + for (int64_t i = working_list_size_; OB_SUCC(ret) && i > 0 && !working_list_.is_empty(); --i) { + ObTmpFileSwapJob *swap_job = nullptr; + if (OB_FAIL(pop_working_job_(swap_job))) { + STORAGE_LOG(WARN, "fail to pop working job or ptr is null", KR(ret), KP(swap_job)); + } else { + int64_t response_time = ObTimeUtility::current_time() - swap_job->get_create_ts(); + swap_monitor_.record_swap_response_time(response_time); + if (OB_FAIL(swap_job->signal_swap_complete(ret_code))) { + STORAGE_LOG(WARN, "fail to signal swap complete", KR(ret), KP(swap_job)); + } + } + } +} + int ObTmpFileSwapTG::push_working_job_(ObTmpFileSwapJob *swap_job) { int ret = OB_SUCCESS; diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h index 1afedd17ca..123de50d96 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h @@ -50,7 +50,8 @@ public: int try_work(); void set_running_mode(const RUNNING_MODE mode); void notify_doing_flush(); - void signal_io_finish(); + void signal_io_finish(int flush_io_finished_ret); + int64_t get_flush_io_finished_ret(); int64_t get_flush_io_finished_round(); int64_t cal_idle_time(); void clean_up_lists(); @@ -79,6 +80,7 @@ private: bool is_inited_; RUNNING_MODE mode_; int64_t last_flush_timestamp_; + int flush_io_finished_ret_; int64_t flush_io_finished_round_; int64_t flushing_block_num_; // maintain it when ObTmpFileFlushTask is created and freed @@ -133,6 +135,7 @@ private: int calculate_swap_page_num_(const int64_t batch_size, int64_t &expect_swap_cnt); int wakeup_satisfied_jobs_(int64_t& wakeup_job_cnt); int wakeup_timeout_jobs_(); + void wakeup_all_jobs_(int ret_code); private: bool is_inited_; int tg_id_;