From 7dfea265d11e2ebd451d881f346ab615de9c68cd Mon Sep 17 00:00:00 2001 From: obdev Date: Wed, 18 Sep 2024 08:15:17 +0000 Subject: [PATCH] split tmp file flush thread into 4 timers for performance reason --- src/share/ob_thread_define.h | 1 + .../tmp_file/ob_shared_nothing_tmp_file.cpp | 43 +++-- .../tmp_file/ob_shared_nothing_tmp_file.h | 3 +- .../tmp_file/ob_tmp_file_flush_ctx.cpp | 151 ++++++++++++++-- src/storage/tmp_file/ob_tmp_file_flush_ctx.h | 48 ++++- .../tmp_file/ob_tmp_file_flush_manager.cpp | 61 ++++--- .../tmp_file/ob_tmp_file_flush_manager.h | 5 +- src/storage/tmp_file/ob_tmp_file_global.h | 1 + .../ob_tmp_file_page_cache_controller.cpp | 4 + .../ob_tmp_file_page_cache_controller.h | 1 + .../tmp_file/ob_tmp_file_thread_wrapper.cpp | 169 +++++++++++++----- .../tmp_file/ob_tmp_file_thread_wrapper.h | 9 +- 12 files changed, 387 insertions(+), 109 deletions(-) diff --git a/src/share/ob_thread_define.h b/src/share/ob_thread_define.h index eae740832..5f106b4d7 100755 --- a/src/share/ob_thread_define.h +++ b/src/share/ob_thread_define.h @@ -197,4 +197,5 @@ TG_DEF(ObPrivateBlockGCThread, PrivGCThread, QUEUE_THREAD, TG_DEF(TmpFileSwap, TFSwap, THREAD_POOL, 1) +TG_DEF(TmpFileFlush, TFFlush, TIMER, 1024) #endif diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp index 446013c5b..757189d15 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.cpp @@ -2505,10 +2505,13 @@ int ObSharedNothingTmpFile::generate_data_flush_info_( ret = OB_ERR_UNEXPECTED; LOG_WARN("flush sequence not match", KR(ret), K(inner_flush_ctx_.flush_seq_), K(flush_task), KPC(this)); - } else if (OB_FAIL(copy_flush_data_from_wbp_(flush_task, info, data_flush_context, - copy_begin_page_id, copy_begin_page_virtual_id, - copy_end_page_id, - flush_sequence, need_flush_tail))) { + } else if (ObTmpFileFlushTask::TaskType::META == flush_task.get_type()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("flush task type is unexpected", KR(ret), K(flush_task), K(data_flush_context), KPC(this)); + } else if (OB_FAIL(collect_flush_data_page_id_(flush_task, info, data_flush_context, + copy_begin_page_id, copy_begin_page_virtual_id, + copy_end_page_id, + flush_sequence, need_flush_tail))) { LOG_WARN("fail to copy flush data from wbp", KR(ret), K(flush_task), K(info), K(data_flush_context), KPC(this)); } @@ -2549,7 +2552,7 @@ int ObSharedNothingTmpFile::generate_data_flush_info( return ret; } -int ObSharedNothingTmpFile::copy_flush_data_from_wbp_( +int ObSharedNothingTmpFile::collect_flush_data_page_id_( ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &info, ObTmpFileDataFlushContext &data_flush_context, @@ -2571,8 +2574,9 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_( uint32_t cur_flushed_page_id = ObTmpFileGlobal::INVALID_PAGE_ID; uint32_t cur_page_id = copy_begin_page_id; int64_t cur_page_virtual_id = copy_begin_page_virtual_id; + int64_t collected_page_cnt = 0; - if (OB_ISNULL(buf) || OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) { + if (OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid buf or write_offset", KR(ret), KP(buf), K(write_offset), K(flush_task), KPC(this)); } else if (OB_FAIL(inner_flush_ctx_.data_flush_infos_.push_back(InnerFlushInfo()))) { @@ -2596,21 +2600,21 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_( LOG_WARN("fail to read page", KR(ret), K(fd_), K(cur_page_id)); } else if (OB_FAIL(wbp_->notify_write_back(fd_, cur_page_id, ObTmpFilePageUniqKey(cur_page_virtual_id)))) { LOG_WARN("fail to notify write back", KR(ret), K(fd_), K(cur_page_id)); - } else if (OB_UNLIKELY(!flush_task.check_buf_range_valid(buf, ObTmpFileGlobal::PAGE_SIZE))) { - ret = OB_ERR_UNEXPECTED; - LOG_WARN("invalid buffer range", KR(ret), K(fd_), K(write_offset), KP(buf)); + } else if (OB_FAIL(flush_task.get_flush_page_id_arr().push_back(cur_page_id))) { + LOG_ERROR("fail to push back flush page id", KR(ret), K(fd_), K(cur_page_id), KPC(this)); + ret = OB_ITER_END; // override error code } else { // ObTmpPageCacheKey cache_key(flush_task.get_block_index(), // write_offset / ObTmpFileGlobal::PAGE_SIZE, tenant_id_); // ObTmpPageCacheValue cache_value(page_buf); // ObTmpPageCache::get_instance().try_put_page_to_cache(cache_key, cache_value); - MEMCPY(buf + write_offset, page_buf, ObTmpFileGlobal::PAGE_SIZE); write_offset += ObTmpFileGlobal::PAGE_SIZE; flushing_page_num += 1; cur_flushed_page_id = cur_page_id; cur_page_id = next_page_id; cur_page_virtual_id += 1; + collected_page_cnt += 1; if (original_state_is_dirty) { write_back_data_page_num_++; } @@ -2658,11 +2662,16 @@ int ObSharedNothingTmpFile::copy_flush_data_from_wbp_( data_flush_context.set_has_flushed_last_partially_written_page(true); } flush_task.set_data_length(write_offset); + flush_task.set_buffer_pool_ptr(wbp_); + flush_task.set_type(ObTmpFileFlushTask::DATA); inner_flush_ctx_.flush_seq_ = flush_sequence; } else { LOG_WARN("fail to generate data flush info", KR(ret), K(fd_), K(need_flush_tail), K(flush_sequence), K(data_flush_context), K(info), K(flush_task), KPC(this)); + for (int32_t i = 0; i < collected_page_cnt; ++i) { + flush_task.get_flush_page_id_arr().pop_back(); + } if (inner_flush_ctx_.data_flush_infos_.size() == origin_info_num + 1) { inner_flush_ctx_.data_flush_infos_.pop_back(); } @@ -2697,6 +2706,9 @@ int ObSharedNothingTmpFile::generate_meta_flush_info_( ret = OB_ERR_UNEXPECTED; LOG_WARN("flush sequence not match", KR(ret), K(flush_sequence), K(inner_flush_ctx_.flush_seq_), K(flush_task), KPC(this)); + } else if (ObTmpFileFlushTask::TaskType::DATA == flush_task.get_type()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("flush task type is unexpected", KR(ret), K(flush_task), K(meta_flush_context), KPC(this)); } else if (OB_ISNULL(buf) || OB_UNLIKELY(OB_STORAGE_OBJECT_MGR.get_macro_object_size() <= write_offset)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid buf or write_offset", KR(ret), KP(buf), K(write_offset), K(flush_task), KPC(this)); @@ -2725,6 +2737,7 @@ int ObSharedNothingTmpFile::generate_meta_flush_info_( if (OB_SUCC(ret)) { flush_task.set_data_length(write_offset); + flush_task.set_type(ObTmpFileFlushTask::META); inner_flush_ctx_.flush_seq_ = flush_sequence; } else { LOG_WARN("fail to generate meta flush info", KR(ret), K(fd_), K(flush_task), @@ -2788,9 +2801,6 @@ int ObSharedNothingTmpFile::insert_meta_tree_item(const ObTmpFileFlushInfo &info last_page_lock_.unlock(); } - if (OB_SUCC(ret)) { - truncate_lock_.unlock(); - } } else { ret = OB_ERR_UNEXPECTED; LOG_WARN("flush info does not contain data info", KR(ret), K(info), KPC(this)); @@ -2826,6 +2836,13 @@ int ObSharedNothingTmpFile::insert_meta_tree_item(const ObTmpFileFlushInfo &info return ret; } +int ObSharedNothingTmpFile::copy_finish() +{ + int ret = OB_SUCCESS; + truncate_lock_.unlock(); + return ret; +} + int ObSharedNothingTmpFile::cal_next_flush_page_id_from_flush_ctx_or_file_( const ObTmpFileDataFlushContext &data_flush_context, uint32_t &next_flush_page_id, diff --git a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h index 0b9e6d789..61adbf46d 100644 --- a/src/storage/tmp_file/ob_shared_nothing_tmp_file.h +++ b/src/storage/tmp_file/ob_shared_nothing_tmp_file.h @@ -270,6 +270,7 @@ public: const int64_t flush_sequence, const bool need_flush_tail); int insert_meta_tree_item(const ObTmpFileFlushInfo &info, int64_t block_index); + int copy_finish(); public: int remove_flush_node(const bool is_meta); int reinsert_flush_node(const bool is_meta); @@ -346,7 +347,7 @@ private: int64_t &actual_write_size); int truncate_the_first_wbp_page_(); - int copy_flush_data_from_wbp_(ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &info, + int collect_flush_data_page_id_(ObTmpFileFlushTask &flush_task, ObTmpFileFlushInfo &info, ObTmpFileDataFlushContext &data_flush_context, const uint32_t copy_begin_page_id, const int64_t copy_begin_page_virtual_id, const uint32_t copy_end_page_id, const int64_t flush_sequence, const bool need_flush_tail); diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp index ffa24db67..05175413b 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.cpp @@ -20,6 +20,23 @@ namespace oceanbase namespace tmp_file { +ObTmpFileWriteBlockTask::ObTmpFileWriteBlockTask(ObTmpFileFlushTask &flush_task) + : flush_task_(flush_task) +{ +} + +void ObTmpFileWriteBlockTask::runTimerTask() +{ + int ret = OB_SUCCESS; + + if (OB_FAIL(flush_task_.write_one_block())) { + STORAGE_LOG(WARN, "fail to async write blocks", KR(ret), K(flush_task_)); + } + + flush_task_.atomic_set_write_block_ret_code(ret); + flush_task_.atomic_set_write_block_executed(true); +} + ObTmpFileFlushInfo::ObTmpFileFlushInfo() : fd_(ObTmpFileGlobal::INVALID_TMP_FILE_FD), batch_flush_idx_(0), @@ -239,19 +256,23 @@ ObTmpFileFlushTask::ObTmpFileFlushTask() : inst_handle_(), kvpair_(nullptr), block_handle_(), + flush_page_id_arr_(), write_block_ret_code_(OB_SUCCESS), - ret_code_(OB_SUCCESS), + io_result_ret_code_(OB_SUCCESS), data_length_(0), block_index_(-1), flush_seq_(-1), create_ts_(-1), + is_write_block_executed_(false), is_io_finished_(false), fast_flush_tree_page_(false), recorded_as_prepare_finished_(false), + type_(TaskType::INVALID), task_state_(ObTmpFileFlushTaskState::TFFT_INITED), tmp_file_block_handle_(), handle_(), - flush_infos_() + flush_infos_(), + flush_write_block_task_(*this) { flush_infos_.set_attr(ObMemAttr(MTL_ID(), "TFFlushInfos")); } @@ -259,17 +280,20 @@ ObTmpFileFlushTask::ObTmpFileFlushTask() void ObTmpFileFlushTask::destroy() { block_handle_.reset(); + flush_page_id_arr_.reset(); inst_handle_.reset(); kvpair_ = nullptr; write_block_ret_code_ = OB_SUCCESS; - ret_code_ = OB_SUCCESS; + io_result_ret_code_ = OB_SUCCESS; data_length_ = 0; block_index_ = -1; flush_seq_ = -1; create_ts_ = -1; + is_write_block_executed_ = false; is_io_finished_ = false; fast_flush_tree_page_ = false; recorded_as_prepare_finished_ = false; + type_ = TaskType::INVALID; task_state_ = ObTmpFileFlushTaskState::TFFT_INITED; flush_infos_.reset(); } @@ -287,27 +311,120 @@ int ObTmpFileFlushTask::prealloc_block_buf() return ret; } +int ObTmpFileFlushTask::lazy_alloc_and_fill_block_buf_for_data_page_() +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(flush_page_id_arr_.count() <= 0 || + flush_page_id_arr_.count() > ObTmpFileGlobal::BLOCK_PAGE_NUMS || + flush_page_id_arr_.count() != upper_align(data_length_, ObTmpFileGlobal::PAGE_SIZE) / ObTmpFileGlobal::PAGE_SIZE)){ + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid flush page id array size", KR(ret), K(flush_page_id_arr_.count()), KPC(this)); + } else if (flush_infos_.size() == 0) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("flush_infos_ is empty", KR(ret), KPC(this)); + } else if (OB_ISNULL(wbp_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("write buffer pool ptr is nullptr", KR(ret), KPC(this)); + } else if (OB_FAIL(prealloc_block_buf())) { + LOG_WARN("fail to prealloc block", KR(ret), K(block_index_), KPC(this)); + } else { + char* page_buf = nullptr; + int32_t copy_index = 0; + for (int32_t i = 0; OB_SUCC(ret) && i < flush_infos_.count(); ++i) { + ObTmpFileFlushInfo &flush_info = flush_infos_.at(i); + int64_t cur_info_fd = flush_info.fd_; + int64_t cur_info_disk_begin_id = flush_info.flush_data_page_disk_begin_id_; + int64_t cur_info_page_num = flush_info.flush_data_page_num_; + int64_t cur_info_virtual_page_id = flush_info.flush_virtual_page_id_; + if (copy_index != cur_info_disk_begin_id) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("unexpected flush info", KR(ret), K(i), K(copy_index), KPC(this)); + } + while (OB_SUCC(ret) && copy_index < cur_info_disk_begin_id + cur_info_page_num + && copy_index < flush_page_id_arr_.count()) { + uint32_t page_id = flush_page_id_arr_.at(copy_index); + uint32_t next_page_id = ObTmpFileGlobal::INVALID_PAGE_ID; // UNUSED + if (OB_FAIL(wbp_->read_page(cur_info_fd, page_id, ObTmpFilePageUniqKey(cur_info_virtual_page_id), page_buf, next_page_id))) { + LOG_WARN("fail to read page", KR(ret), K(page_id), KPC(this)); + } else if (OB_UNLIKELY(!check_buf_range_valid(get_data_buf(), ObTmpFileGlobal::PAGE_SIZE))) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid buffer range", KR(ret), KP(get_data_buf()), KPC(this)); + } else { + // only copy the size we recorded if we need to flush the last page + // since we do not hold last_page_lock and the last page may be appended + int64_t copy_size = ObTmpFileGlobal::PAGE_SIZE; + if (flush_info.file_size_ != 0) { + if (cur_info_disk_begin_id + cur_info_page_num - 1 != copy_index) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid flush info", + KR(ret), K(cur_info_disk_begin_id), K(cur_info_page_num), K(copy_index), K(flush_info), KPC(this)); + } else { + copy_size = flush_info.file_size_ % ObTmpFileGlobal::PAGE_SIZE; + } + } + if (OB_SUCC(ret)) { + MEMCPY(get_data_buf() + copy_index * ObTmpFileGlobal::PAGE_SIZE, page_buf, copy_size); + } + } + copy_index += 1; + cur_info_virtual_page_id += 1; + } + } + + if (OB_FAIL(ret)) { + LOG_ERROR("fail to read page and fill block buf", KR(ret), KPC(this)); + } + + // release truncate_lock regardless of ret + for (int32_t i = 0; i < flush_infos_.count(); ++i) { + flush_infos_.at(i).file_handle_.get()->copy_finish(); + } + + if (OB_SUCC(ret)) { + int tmp_ret = OB_SUCCESS; + if (TaskType::DATA != type_) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("invalid task type when copy data in writing block", KR(ret), KPC(this)); + } else if (OB_TMP_FAIL(ObTmpBlockCache::get_instance().put_block(get_inst_handle(), + get_kvpair(), + get_block_handle()))) { + LOG_WARN("fail to put block into block cache", KR(tmp_ret), KR(ret), KPC(this)); + } + } + } + return ret; +} + int ObTmpFileFlushTask::write_one_block() { int ret = OB_SUCCESS; handle_.reset(); - blocksstable::ObMacroBlockWriteInfo write_info; - write_info.io_desc_.set_wait_event(2); // TODO: 检查是否需要用临时文件自己的event - write_info.io_desc_.set_resource_group_id(THIS_WORKER.get_group_id()); - write_info.io_desc_.set_sys_module_id(ObIOModule::TMP_TENANT_MEM_BLOCK_IO); - write_info.buffer_ = get_data_buf(); - write_info.size_ = OB_STORAGE_OBJECT_MGR.get_macro_object_size(); - write_info.offset_ = 0; - - if (FAILEDx(blocksstable::ObBlockManager::async_write_block(write_info, handle_))) { - LOG_ERROR("fail to async write block", KR(ret), K(write_info)); - } else if (OB_FAIL(OB_SERVER_BLOCK_MGR.update_write_time(handle_.get_macro_id(), - true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect - LOG_WARN("failed to update write time", KR(ret), K(handle_)); + if (!block_handle_.is_valid()) { + if (TaskType::DATA != type_) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("block handle is null when writing non data task", KR(ret), KPC(this)); + } else if (OB_FAIL(lazy_alloc_and_fill_block_buf_for_data_page_())) { + LOG_WARN("fail to lazy alloc and fill block buf", KR(ret), KPC(this)); + } } - atomic_set_write_block_ret_code(ret); + if (OB_SUCC(ret)) { + blocksstable::ObMacroBlockWriteInfo write_info; + write_info.io_desc_.set_wait_event(2); // TODO: 检查是否需要用临时文件自己的event + write_info.io_desc_.set_resource_group_id(THIS_WORKER.get_group_id()); + write_info.io_desc_.set_sys_module_id(ObIOModule::TMP_TENANT_MEM_BLOCK_IO); + write_info.buffer_ = get_data_buf(); + write_info.size_ = upper_align(get_data_length(), ObTmpFileGlobal::PAGE_SIZE); + write_info.offset_ = 0; + if (OB_FAIL(blocksstable::ObBlockManager::async_write_block(write_info, handle_))) { + LOG_ERROR("fail to async write block", KR(ret), K(write_info)); + } else if (OB_FAIL(OB_SERVER_BLOCK_MGR.update_write_time(handle_.get_macro_id(), + true/*update_to_max_time)*/))){ // update to max time to skip bad block inspect + LOG_WARN("failed to update write time", KR(ret), K(handle_)); + } + } return ret; } diff --git a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h index ca4c03c71..d0defd1f7 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_ctx.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_ctx.h @@ -28,6 +28,16 @@ namespace tmp_file { class ObTmpFileFlushTG; +class ObTmpFileWriteBlockTask : public common::ObTimerTask +{ +public: + explicit ObTmpFileWriteBlockTask(ObTmpFileFlushTask &flush_task); + virtual ~ObTmpFileWriteBlockTask() {} + virtual void runTimerTask() override; +private: + ObTmpFileFlushTask &flush_task_; +}; + struct ObTmpFileDataFlushContext { public: @@ -242,6 +252,12 @@ public: TFFT_FINISH = 7, TFFT_ABORT = 8, }; + enum TaskType + { + INVALID = -1, + DATA = 0, + META = 1, + }; public: void destroy(); int prealloc_block_buf(); @@ -254,8 +270,11 @@ public: OB_INLINE bool is_valid() const { return OB_NOT_NULL(get_data_buf()); } OB_INLINE bool is_full() const { return data_length_ == OB_STORAGE_OBJECT_MGR.get_macro_object_size(); } OB_INLINE char *get_data_buf() const { return block_handle_.value_ == nullptr ? nullptr : block_handle_.value_->get_buffer(); } - OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&ret_code_, ret_code); } - OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&ret_code_); } + OB_INLINE ObSEArray& get_flush_page_id_arr() { + return flush_page_id_arr_; + } + OB_INLINE void atomic_set_ret_code(int ret_code) { ATOMIC_SET(&io_result_ret_code_, ret_code); } + OB_INLINE int atomic_get_ret_code() const { return ATOMIC_LOAD(&io_result_ret_code_); } OB_INLINE void atomic_set_write_block_ret_code(int write_block_ret_code) { ATOMIC_SET(&write_block_ret_code_, write_block_ret_code); } @@ -276,6 +295,8 @@ public: OB_INLINE bool get_is_fast_flush_tree() const { return fast_flush_tree_page_; } OB_INLINE void mark_recorded_as_prepare_finished() { recorded_as_prepare_finished_ = true; } OB_INLINE bool get_recorded_as_prepare_finished() const { return recorded_as_prepare_finished_; } + OB_INLINE void set_type(TaskType type) { type_ = type; } + OB_INLINE TaskType get_type() const { return type_; } OB_INLINE void set_state(const ObTmpFileFlushTaskState state) { task_state_ = state; } OB_INLINE ObTmpFileFlushTaskState get_state() const { return task_state_; } OB_INLINE void set_tmp_file_block_handle(const ObTmpFileBlockHandle &tfb_handle) { tmp_file_block_handle_ = tfb_handle; } @@ -289,26 +310,39 @@ public: return buffer != nullptr && get_data_buf() != nullptr && buffer >= get_data_buf() && buffer + length <= get_data_buf() + OB_STORAGE_OBJECT_MGR.get_macro_object_size(); } - TO_STRING_KV(KP(this), KP(kvpair_), K(write_block_ret_code_), K(ret_code_), K(data_length_), + OB_INLINE void set_buffer_pool_ptr(ObTmpWriteBufferPool *wbp) { wbp_ = wbp; } + OB_INLINE void atomic_set_write_block_executed(const bool executed) { ATOMIC_SET(&is_write_block_executed_, executed); } + OB_INLINE bool atomic_get_write_block_executed() const { return ATOMIC_LOAD(&is_write_block_executed_); } + OB_INLINE ObTmpFileWriteBlockTask& get_flush_write_block_task() { return flush_write_block_task_; } + + TO_STRING_KV(KP(this), KP(kvpair_), K(io_result_ret_code_), K(data_length_), K(block_index_), K(flush_seq_), K(create_ts_), K(is_io_finished_), - K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_)); + K(fast_flush_tree_page_), K(recorded_as_prepare_finished_), K(type_), K(task_state_), K(tmp_file_block_handle_), K(flush_infos_), + K(is_write_block_executed_), K(write_block_ret_code_), K(flush_write_block_task_), K(flush_page_id_arr_.count())); +private: + int lazy_alloc_and_fill_block_buf_for_data_page_(); private: ObKVCacheInstHandle inst_handle_; ObKVCachePair *kvpair_; ObTmpBlockValueHandle block_handle_; + ObSEArray flush_page_id_arr_; int write_block_ret_code_; - int ret_code_; + int io_result_ret_code_; int64_t data_length_; // data length (including padding to make length upper align to page size) int64_t block_index_; // tmp file block logical index in ObTmpFileBlockManager int64_t flush_seq_; // flush sequence, for verification purpose int64_t create_ts_; - bool is_io_finished_; + bool is_write_block_executed_; // set to true if task has sent IO + bool is_io_finished_; // set to true if task has finished async IO bool fast_flush_tree_page_; // indicate the task requires fast flush tree pages - bool recorded_as_prepare_finished_; + bool recorded_as_prepare_finished_; // set to true if this task has been recorded by ObTmpFileBatchFlushContext as prepare_finished + TaskType type_; ObTmpFileFlushTaskState task_state_; ObTmpFileBlockHandle tmp_file_block_handle_;// hold a reference to the corresponding tmp file block to prevent it from being released blocksstable::ObMacroBlockHandle handle_; ObArray flush_infos_; // multi file flush into one block if size > 0 + ObTmpFileWriteBlockTask flush_write_block_task_; + ObTmpWriteBufferPool *wbp_; // use to lazy read data pages }; } // end namespace tmp_file diff --git a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp index 4359a86e1..c2932df31 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp +++ b/src/storage/tmp_file/ob_tmp_file_flush_manager.cpp @@ -43,6 +43,7 @@ int ObTmpFileFlushManager::init() } else if (OB_FAIL(flush_ctx_.init())) { STORAGE_LOG(WARN, "fail to init flush ctx", KR(ret)); } else { + cur_flush_timer_idx_ = 0; is_inited_ = true; } return ret; @@ -54,6 +55,13 @@ void ObTmpFileFlushManager::destroy() flush_ctx_.destroy(); } +void ObTmpFileFlushManager::set_flush_timer_tg_id(int* flush_timer_tg_id, const int64_t timer_cnt) +{ + for (int64_t i = 0; i < timer_cnt && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + flush_timer_tg_id_[i] = flush_timer_tg_id[i]; + } +} + int ObTmpFileFlushManager::alloc_flush_task(ObTmpFileFlushTask *&flush_task) { int ret = OB_SUCCESS; @@ -385,7 +393,9 @@ int ObTmpFileFlushManager::fill_block_buf_(ObTmpFileFlushTask &flush_task) // but is stuck in TFFT_INSERT_META_TREE state and new task has no meta pages to flush break; case FlushCtxState::FSM_F4: - if (!flush_task.is_full() && FlushCtxState::FSM_FINISHED != flush_ctx_.get_state() + if (OB_FAIL(flush_task.prealloc_block_buf())) { + STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task)); + } else if (!flush_task.is_full() && FlushCtxState::FSM_FINISHED != flush_ctx_.get_state() && OB_FAIL(inner_fill_block_buf_(flush_task, flush_ctx_.get_state(), true/*is_meta*/, false/*flush_tail*/))) { if (OB_ITER_END != ret) { @@ -707,7 +717,7 @@ int ObTmpFileFlushManager::drive_flush_task_retry_( return ret; } -int ObTmpFileFlushManager::drive_flush_task_wait_to_finish_(ObTmpFileFlushTask &flush_task, FlushState &next_state) +int ObTmpFileFlushManager::drive_flush_task_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state) { int ret = OB_SUCCESS; ObTmpFileFlushTask::ObTmpFileFlushTaskState state = flush_task.get_state(); @@ -719,7 +729,7 @@ int ObTmpFileFlushManager::drive_flush_task_wait_to_finish_(ObTmpFileFlushTask & break; default: ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "unexpected state in drive_flush_task_wait_to_finish_", K(state)); + STORAGE_LOG(WARN, "unexpected state in drive_flush_task_wait_", K(state)); break; } return ret; @@ -766,7 +776,7 @@ int ObTmpFileFlushManager::io_finished(ObTmpFileFlushTask &flush_task) { int ret = OB_SUCCESS; FlushState next_state = FlushState::TFFT_INITED; - if (OB_FAIL(drive_flush_task_wait_to_finish_(flush_task, next_state))) { + if (OB_FAIL(drive_flush_task_wait_(flush_task, next_state))) { STORAGE_LOG(WARN, "fail to drive flush state machine to FINISHED", KR(ret), K(flush_task)); } else if (flush_task.get_state() < next_state && OB_FAIL(advance_status_(flush_task, next_state))) { // if the task encounters an IO error, its status will silently revert to TFFT_ASYNC_WRITE; do not verify status here. @@ -843,14 +853,17 @@ int ObTmpFileFlushManager::handle_create_block_index_(ObTmpFileFlushTask &flush_ return ret; } +// For performance reasons, we have delayed the memory allocation and data copying of the flush data task +// until the TFFT_ASYNC_WRITE stage, distributing tasks across 4 timer threads for execution; +// flush meta task has fewer occurrences, so we allocate and copy memory directly in the current thread. int ObTmpFileFlushManager::handle_fill_block_buf_(ObTmpFileFlushTask &flush_task, FlushState &next_state) { int ret = OB_SUCCESS; int tmp_ret = OB_SUCCESS; - if (OB_FAIL(flush_task.prealloc_block_buf())) { - STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task)); - } else if (flush_task.get_is_fast_flush_tree()) { // skip flush level, copy meta tree pages directly - if (OB_FAIL(fast_fill_block_buf_with_meta_(flush_task))) { + if (flush_task.get_is_fast_flush_tree()) { // skip flush level, copy meta tree pages directly + if (OB_FAIL(flush_task.prealloc_block_buf())) { + STORAGE_LOG(WARN, "fail to prealloc block buf", KR(ret), K(flush_task)); + } else if (OB_FAIL(fast_fill_block_buf_with_meta_(flush_task))) { STORAGE_LOG(WARN, "fail to fill block buffer with meta", KR(ret), K(flush_task)); } } else { @@ -874,25 +887,19 @@ int ObTmpFileFlushManager::handle_fill_block_buf_(ObTmpFileFlushTask &flush_task flush_task.get_flush_infos().at(i).has_meta())) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(ERROR, "flush info has both data and meta", KR(ret), K(flush_task)); - } else if (OB_UNLIKELY((flush_task.get_flush_infos().at(0).has_data() != - flush_task.get_flush_infos().at(i).has_data()) || - (flush_task.get_flush_infos().at(0).has_meta() != - flush_task.get_flush_infos().at(i).has_meta()))) { + } else if (OB_UNLIKELY(ObTmpFileFlushTask::DATA == flush_task.get_type() && + flush_task.get_flush_infos().at(i).has_meta())) { ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(ERROR, "flush infos mixed storage meta and data pages", KR(ret), K(flush_task)); + STORAGE_LOG(ERROR, "flush infos contain unexpected page type", KR(ret), K(flush_task)); + } else if (OB_UNLIKELY(ObTmpFileFlushTask::META == flush_task.get_type() && + flush_task.get_flush_infos().at(i).has_data())) { + ret = OB_ERR_UNEXPECTED; + STORAGE_LOG(ERROR, "flush infos contain unexpected page type", KR(ret), K(flush_task)); } } } if (OB_SUCC(ret)) { - const bool is_whole_data_page = flush_task.get_flush_infos().at(0).has_data(); - if (is_whole_data_page && - OB_TMP_FAIL(ObTmpBlockCache::get_instance().put_block(flush_task.get_inst_handle(), - flush_task.get_kvpair(), - flush_task.get_block_handle()))) { - STORAGE_LOG(WARN, "fail to put block into block cache", KR(tmp_ret), K(flush_task)); - } - int64_t used_page_num = flush_task.get_total_page_num(); int64_t unused_page_id = used_page_num; int64_t unused_page_num = ObTmpFileGlobal::BLOCK_PAGE_NUMS - used_page_num; @@ -927,21 +934,25 @@ int ObTmpFileFlushManager::handle_insert_meta_tree_(ObTmpFileFlushTask &flush_ta int ObTmpFileFlushManager::handle_async_write_(ObTmpFileFlushTask &flush_task, FlushState &next_state) { int ret = OB_SUCCESS; - if (OB_FAIL(flush_task.write_one_block())) { - STORAGE_LOG(WARN, "fail to async write blocks", KR(ret), K(flush_task)); + cur_flush_timer_idx_ = (cur_flush_timer_idx_ + 1) % ObTmpFileGlobal::FLUSH_TIMER_CNT; + if (OB_FAIL(TG_SCHEDULE(flush_timer_tg_id_[cur_flush_timer_idx_], flush_task.get_flush_write_block_task(), 0/*delay*/, false/*repeat*/))) { + LOG_WARN("TG_SCHEDULE tmp file write block task failed", KR(ret), K(flush_timer_tg_id_[cur_flush_timer_idx_]), K(cur_flush_timer_idx_), K(flush_task)); } else { next_state = FlushState::TFFT_WAIT; } + return ret; } int ObTmpFileFlushManager::handle_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state) { int ret = OB_SUCCESS; + int write_block_ret_code = flush_task.atomic_get_write_block_ret_code(); int task_ret_code = flush_task.atomic_get_ret_code(); - if (OB_SUCCESS != task_ret_code) { + if (OB_SUCCESS != write_block_ret_code || OB_SUCCESS != task_ret_code) { // rollback the status to TFFT_ASYNC_WRITE if IO failed, and re-send the I/O in the retry process. - STORAGE_LOG(INFO, "flush_task io fail, retry it later", KR(task_ret_code), K(flush_task)); + STORAGE_LOG(INFO, "flush_task io fail, retry it later", + KR(write_block_ret_code), KR(task_ret_code), K(flush_task)); flush_task.set_state(FlushState::TFFT_ASYNC_WRITE); } else if (OB_FAIL(tmp_file_block_mgr_.write_back_succ(flush_task.get_block_index(), flush_task.get_macro_block_handle().get_macro_id()))) { diff --git a/src/storage/tmp_file/ob_tmp_file_flush_manager.h b/src/storage/tmp_file/ob_tmp_file_flush_manager.h index feba4f437..cb79fd25a 100644 --- a/src/storage/tmp_file/ob_tmp_file_flush_manager.h +++ b/src/storage/tmp_file/ob_tmp_file_flush_manager.h @@ -67,6 +67,7 @@ public: ~ObTmpFileFlushManager() {} int init(); void destroy(); + void set_flush_timer_tg_id(int* flush_timer_tg_id, const int64_t timer_cnt); TO_STRING_KV(K(is_inited_), K(flush_ctx_)); public: @@ -99,7 +100,7 @@ private: int advance_status_(ObTmpFileFlushTask &flush_task, const FlushState &state); int drive_flush_task_prepare_(ObTmpFileFlushTask &flush_task, const FlushState state, FlushState &next_state); int drive_flush_task_retry_(ObTmpFileFlushTask &flush_task, const FlushState state, FlushState &next_state); - int drive_flush_task_wait_to_finish_(ObTmpFileFlushTask &flush_task, FlushState &next_state); + int drive_flush_task_wait_(ObTmpFileFlushTask &flush_task, FlushState &next_state); int handle_alloc_flush_task_(const bool fast_flush_meta, ObTmpFileFlushTask *&flush_task); int handle_create_block_index_(ObTmpFileFlushTask &flush_task, FlushState &next_state); int handle_fill_block_buf_(ObTmpFileFlushTask &flush_task, FlushState &next_state); @@ -125,6 +126,8 @@ private: ObTmpWriteBufferPool &write_buffer_pool_; ObTmpFileEvictionManager &evict_mgr_; ObTmpFileFlushPriorityManager &flush_priority_mgr_; + int32_t cur_flush_timer_idx_; + int flush_timer_tg_id_[ObTmpFileGlobal::FLUSH_TIMER_CNT]; }; } // end namespace tmp_file diff --git a/src/storage/tmp_file/ob_tmp_file_global.h b/src/storage/tmp_file/ob_tmp_file_global.h index 6d40683b4..3328df924 100644 --- a/src/storage/tmp_file/ob_tmp_file_global.h +++ b/src/storage/tmp_file/ob_tmp_file_global.h @@ -51,6 +51,7 @@ struct ObTmpFileGlobal final FSM_FINISHED = 5 }; static const int64_t INVALID_FLUSH_SEQUENCE = -1; + static const int32_t FLUSH_TIMER_CNT = 4; }; diff --git a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp index 767b4a173..ac48d0c62 100644 --- a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp +++ b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.cpp @@ -51,6 +51,8 @@ int ObTmpFilePageCacheController::start() int ret = OB_SUCCESS; if (IS_NOT_INIT) { STORAGE_LOG(WARN, "tmp file page cache controller is not inited"); + } else if (OB_FAIL(flush_tg_.start())) { + STORAGE_LOG(WARN, "fail to start swap thread", KR(ret)); } else if (OB_FAIL(swap_tg_.start())) { STORAGE_LOG(WARN, "fail to start swap thread", KR(ret)); } @@ -65,6 +67,7 @@ void ObTmpFilePageCacheController::stop() } else { // stop background threads should follow the order 'swap' -> 'flush' because 'swap' holds ref to 'flush' swap_tg_.stop(); + flush_tg_.stop(); } } @@ -75,6 +78,7 @@ void ObTmpFilePageCacheController::wait() STORAGE_LOG(WARN, "tmp file page cache controller is not inited"); } else { swap_tg_.wait(); + flush_tg_.wait(); } } diff --git a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.h b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.h index 92dffc9cf..4be0bc833 100644 --- a/src/storage/tmp_file/ob_tmp_file_page_cache_controller.h +++ b/src/storage/tmp_file/ob_tmp_file_page_cache_controller.h @@ -40,6 +40,7 @@ public: } ~ObTmpFilePageCacheController() {} public: + static const int64_t FLUSH_TIMER_CNT = 4; static const int64_t FLUSH_FAST_INTERVAL = 5; // 5ms static const int64_t FLUSH_INTERVAL = 1000; // 1s static const int64_t SWAP_FAST_INTERVAL = 5; // 5ms diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp index 51c656346..3ced7c027 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.cpp @@ -51,6 +51,9 @@ ObTmpFileFlushTG::ObTmpFileFlushTG( fast_loop_cnt_(0), fast_idle_loop_cnt_(0) { + for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + flush_timer_tg_id_[i] = -1; + } } int ObTmpFileFlushTG::init() @@ -60,48 +63,97 @@ int ObTmpFileFlushTG::init() ret = OB_INIT_TWICE; STORAGE_LOG(WARN, "ObTmpFileSwapTG init twice"); } else { - is_inited_ = true; - mode_ = RUNNING_MODE::NORMAL; - last_flush_timestamp_ = 0; - flush_io_finished_ret_ = OB_SUCCESS; - flush_io_finished_round_ = 0; - flushing_block_num_ = 0; + for (int32_t i = 0; OB_SUCC(ret) && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + if (OB_FAIL(TG_CREATE_TENANT(lib::TGDefIDs::TmpFileFlush, flush_timer_tg_id_[i]))) { + STORAGE_LOG(WARN, "fail to create flush timer thread", KR(ret)); + } + } + if (OB_SUCC(ret)) { + is_inited_ = true; + mode_ = RUNNING_MODE::NORMAL; + last_flush_timestamp_ = 0; + flush_io_finished_ret_ = OB_SUCCESS; + flush_io_finished_round_ = 0; + flushing_block_num_ = 0; - fast_flush_meta_task_cnt_ = 0; - wait_list_size_ = 0; - retry_list_size_ = 0; - finished_list_size_ = 0; + fast_flush_meta_task_cnt_ = 0; + wait_list_size_ = 0; + retry_list_size_ = 0; + finished_list_size_ = 0; - normal_loop_cnt_ = 0; - normal_idle_loop_cnt_ = 0; - fast_loop_cnt_ = 0; - fast_idle_loop_cnt_ = 0; + normal_loop_cnt_ = 0; + normal_idle_loop_cnt_ = 0; + fast_loop_cnt_ = 0; + fast_idle_loop_cnt_ = 0; + } + } + + if (OB_FAIL(ret)) { + destroy(); } return ret; } +int ObTmpFileFlushTG::start() +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + STORAGE_LOG(WARN, "ObTmpFileSwapTG not init", KR(ret)); + } else { + for (int32_t i = 0; OB_SUCC(ret) && i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + if (OB_FAIL(TG_START(flush_timer_tg_id_[i]))) { + LOG_WARN("TG_START flush_timer_tg_id_ failed", KR(ret), K(flush_timer_tg_id_[i])); + } + } + + if (OB_SUCC(ret)) { + flush_mgr_.set_flush_timer_tg_id(flush_timer_tg_id_, ObTmpFileGlobal::FLUSH_TIMER_CNT); + } + } + return ret; +} + +void ObTmpFileFlushTG::stop() +{ + for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + TG_STOP(flush_timer_tg_id_[i]); + } +} + +void ObTmpFileFlushTG::wait() +{ + for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + TG_WAIT(flush_timer_tg_id_[i]); + } +} + void ObTmpFileFlushTG::destroy() { - if (IS_INIT) { - clean_up_lists(); - mode_ = RUNNING_MODE::INVALID; - last_flush_timestamp_ = 0; - flush_io_finished_ret_ = OB_SUCCESS; - flush_io_finished_round_ = 0; - flushing_block_num_ = 0; + clean_up_lists(); + mode_ = RUNNING_MODE::INVALID; + last_flush_timestamp_ = 0; + flush_io_finished_ret_ = OB_SUCCESS; + flush_io_finished_round_ = 0; + flushing_block_num_ = 0; - is_fast_flush_meta_ = false; - fast_flush_meta_task_cnt_ = 0; - wait_list_size_ = 0; - retry_list_size_ = 0; - finished_list_size_ = 0; + is_fast_flush_meta_ = false; + fast_flush_meta_task_cnt_ = 0; + wait_list_size_ = 0; + retry_list_size_ = 0; + finished_list_size_ = 0; - normal_loop_cnt_ = 0; - normal_idle_loop_cnt_ = 0; - fast_loop_cnt_ = 0; - fast_idle_loop_cnt_ = 0; + normal_loop_cnt_ = 0; + normal_idle_loop_cnt_ = 0; + fast_loop_cnt_ = 0; + fast_idle_loop_cnt_ = 0; - is_inited_ = false; + is_inited_ = false; + for (int32_t i = 0; i < ObTmpFileGlobal::FLUSH_TIMER_CNT; ++i) { + if (-1 != flush_timer_tg_id_[i]) { + TG_DESTROY(flush_timer_tg_id_[i]); + flush_timer_tg_id_[i] = -1; + } } } @@ -260,19 +312,27 @@ void ObTmpFileFlushTG::flush_fast_() { int ret = OB_SUCCESS; int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size(); - int64_t flush_size = min(get_fast_flush_size_(), get_flushing_block_num_threshold_() * BLOCK_SIZE); + if (OB_FAIL(check_flush_task_io_finished_())) { STORAGE_LOG(WARN, "fail to check flush task io finished", KR(ret)); } if (OB_FAIL(retry_task_())) { STORAGE_LOG(WARN, "fail to retry task", KR(ret)); } - if (flush_size > 0) { - if (OB_FAIL(wash_(flush_size, RUNNING_MODE::FAST))) { - STORAGE_LOG(WARN, "fail to flush fast", KR(ret), KPC(this), K(flush_size)); - } + + int64_t flushing_block_num = ATOMIC_LOAD(&flushing_block_num_); + if (flushing_block_num >= get_flushing_block_num_threshold_()) { + STORAGE_LOG(DEBUG, "reach flushing block num threshold, skip flush", KPC(this)); } else { - STORAGE_LOG(DEBUG, "current expect flush size is 0, skip flush", K(flush_size), K(this)); + int64_t max_flushing_block_num_cur_round = get_flushing_block_num_threshold_() - flushing_block_num; + int64_t flush_size = min(get_fast_flush_size_(), max_flushing_block_num_cur_round * BLOCK_SIZE); + if (flush_size > 0) { + if (OB_FAIL(wash_(flush_size, RUNNING_MODE::FAST))) { + STORAGE_LOG(WARN, "fail to flush fast", KR(ret), KPC(this), K(flush_size)); + } + } else { + STORAGE_LOG(DEBUG, "current expect flush size is 0, skip flush", K(flush_size), KPC(this)); + } } } @@ -481,12 +541,36 @@ int ObTmpFileFlushTG::check_flush_task_io_finished_() if (OB_ISNULL(flush_task)) { ret = OB_ERR_UNEXPECTED; STORAGE_LOG(WARN, "flush task is nullptr", KR(ret)); + } + bool write_block_success = false; + if (OB_FAIL(ret)) { + } else if (!flush_task->atomic_get_write_block_executed()) { + push_wait_list_(flush_task); // not send IO yet, continue waiting + ret = OB_SUCCESS; + } else if (flush_task->atomic_get_write_block_ret_code() != OB_SUCCESS) { + if (flush_task->atomic_get_write_block_ret_code() == OB_SERVER_OUTOF_DISK_SPACE) { + signal_io_finish(OB_SERVER_OUTOF_DISK_SPACE); + } + // rollback to TFFT_ASYNC_WRITE and re-send IO + if (OB_FAIL(flush_mgr_.io_finished(*flush_task))) { + STORAGE_LOG(WARN, "fail to handle flush task finished", KR(ret), KPC(flush_task)); + } else if (FlushState::TFFT_ASYNC_WRITE == flush_task->get_state()) { + push_retry_list_(flush_task); + STORAGE_LOG(DEBUG, "write block failure flush task push to retry list", KPC(flush_task)); + } else { + STORAGE_LOG(ERROR, "unexpected flush task state", KR(ret), KPC(flush_task)); + } + } else { + write_block_success = true; + } + + if (OB_FAIL(ret) || !write_block_success) { } else if (OB_FAIL(flush_task->wait_macro_block_handle())) { if (OB_EAGAIN == ret) { push_wait_list_(flush_task); // IO is not completed, continue waiting ret = OB_SUCCESS; } else { - STORAGE_LOG(ERROR, "unexpected error in waiting flush task finished", KR(ret), KPC(this)); + STORAGE_LOG(WARN, "unexpected error in waiting flush task finished", KR(ret), KPC(this)); } } else if (!flush_task->atomic_get_io_finished()) { ret = OB_ERR_UNEXPECTED; @@ -634,22 +718,21 @@ int ObTmpFileFlushTG::pop_finished_list_(ObTmpFileFlushTask *&flush_task) return ret; } -// fast mode flush size is max(2MB, min(5% * tmp_file_memory,30MB)) int ObTmpFileFlushTG::get_fast_flush_size_() { // TODO: move to page cache controller const int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size(); int64_t wbp_mem_limit = wbp_.get_memory_limit(); - int64_t flush_size = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM * BLOCK_SIZE, upper_align(0.05 * wbp_mem_limit, BLOCK_SIZE))); + int64_t flush_size = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM * BLOCK_SIZE, upper_align(0.1 * wbp_mem_limit, BLOCK_SIZE))); return flush_size; } -// flushing threshold is MIN(20MB, (20% * tmp_file_memory)) -int ObTmpFileFlushTG::get_flushing_block_num_threshold_() +int64_t ObTmpFileFlushTG::get_flushing_block_num_threshold_() { const int64_t BLOCK_SIZE = OB_STORAGE_OBJECT_MGR.get_macro_object_size(); int64_t wbp_mem_limit = wbp_.get_memory_limit(); - int64_t flush_threshold = max(BLOCK_SIZE, min(MAX_FLUSHING_BLOCK_NUM, static_cast(0.2 * wbp_mem_limit / BLOCK_SIZE))); + int64_t flush_threshold = + max(1, min(MAX_FLUSHING_BLOCK_NUM, static_cast(0.2 * wbp_mem_limit / BLOCK_SIZE))); return flush_threshold; } diff --git a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h index cfff4d380..a8d67cf30 100644 --- a/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h +++ b/src/storage/tmp_file/ob_tmp_file_thread_wrapper.h @@ -33,7 +33,7 @@ class ObTmpFileFlushTG { public: typedef ObTmpFileFlushTask::ObTmpFileFlushTaskState FlushState; - static const int64_t MAX_FLUSHING_BLOCK_NUM = 50; + static const int64_t MAX_FLUSHING_BLOCK_NUM = 200; enum RUNNING_MODE { INVALID = 0, NORMAL = 1, @@ -45,6 +45,9 @@ public: ObIAllocator &allocator, ObTmpFileBlockManager &tmp_file_block_mgr); int init(); + int start(); + void stop(); + void wait(); void destroy(); int try_work(); @@ -71,7 +74,7 @@ private: void flush_fast_(); void flush_normal_(); int get_fast_flush_size_(); - int get_flushing_block_num_threshold_(); + int64_t get_flushing_block_num_threshold_(); int push_wait_list_(ObTmpFileFlushTask *flush_task); int pop_wait_list_(ObTmpFileFlushTask *&flush_task); int push_retry_list_(ObTmpFileFlushTask *flush_task); @@ -104,6 +107,8 @@ private: int64_t normal_idle_loop_cnt_; int64_t fast_loop_cnt_; int64_t fast_idle_loop_cnt_; + + int flush_timer_tg_id_[ObTmpFileGlobal::FLUSH_TIMER_CNT]; }; class ObTmpFileSwapTG : public lib::TGRunnable