From ca882ae8b4cd226e83a12fe32c9b2fcb59d60d3d Mon Sep 17 00:00:00 2001 From: JiahuaChen Date: Fri, 9 Feb 2024 16:42:23 +0000 Subject: [PATCH] Fix shared block write start from offset 0 --- .../ob_shared_macro_block_manager.cpp | 67 +++++++++++-------- .../ob_shared_macro_block_manager.h | 11 ++- 2 files changed, 48 insertions(+), 30 deletions(-) diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp index 0bd7a9709c..c013206911 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.cpp +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.cpp @@ -79,7 +79,6 @@ ObSharedMacroBlockMgr::ObSharedMacroBlockMgr() blocks_mutex_(), block_used_size_(), defragmentation_task_(*this), - io_allocator_("SMBM_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), tg_id_(-1), is_inited_(false) { @@ -211,9 +210,14 @@ int ObSharedMacroBlockMgr::write_block( if (OB_SUCC(ret)) { write_info.offset_ = offset_; write_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS); - if (OB_FAIL(do_write_block(write_info, block_info))) { + if (OB_FAIL(do_write_block(macro_handle_.get_macro_id(), write_info, block_info))) { LOG_WARN("fail to do write block", K(ret), K(write_info), K(block_info)); - } else { + } + + // no matter success or failure, advance offset_ + offset_ += size; + + if (OB_SUCC(ret)) { FLOG_INFO("successfully write small sstable", K(ret), K(block_info), K(offset_), "old_block", write_ctx.get_macro_block_list()); write_ctx.reset(); @@ -228,57 +232,59 @@ int ObSharedMacroBlockMgr::write_block( return ret; } -int ObSharedMacroBlockMgr::do_write_block( +/*static*/int ObSharedMacroBlockMgr::do_write_block( + const MacroBlockId& macro_id, const ObMacroBlockWriteInfo &write_info, ObBlockInfo &block_info) { int ret = OB_SUCCESS; ObMacroBlockHandle write_macro_handle; + const int64_t offset = write_info.offset_; + const int64_t size = write_info.size_; - if (OB_FAIL(write_macro_handle.set_macro_block_id(macro_handle_.get_macro_id()))) { - LOG_WARN("fail to set macro block id", K(ret), K(macro_handle_.get_macro_id())); + if (OB_FAIL(write_macro_handle.set_macro_block_id(macro_id))) { + LOG_WARN("fail to set macro block id", K(ret), K(macro_id)); } else if (OB_FAIL(write_macro_handle.async_write(write_info))) { LOG_WARN("fail to async write virtual macro block", K(ret), K(write_macro_handle)); } else if (OB_FAIL(write_macro_handle.wait())) { LOG_WARN("fail to wait previous io", K(ret), K(write_info)); } - if (OB_TIMEOUT == ret) { - // although ret is timeout, file system may be still writing, so we should skip the offset - offset_ += write_info.size_; - } - - if (OB_SUCC(ret) && !write_macro_handle.is_empty() && MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE == - GCONF.micro_block_merge_verify_level && 0 != offset_) { - if (OB_FAIL(check_write_complete(write_macro_handle.get_macro_id(), write_info.size_))) { + if (OB_SUCC(ret) + && !write_macro_handle.is_empty() + && MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE == GCONF.micro_block_merge_verify_level + && 0 != offset) { + if (OB_FAIL(check_write_complete(write_macro_handle.get_macro_id(), offset, size))) { LOG_WARN("fail to check write completion", K(ret)); } } if (OB_SUCC(ret)) { - block_info.macro_id_ = write_macro_handle.get_macro_id(); - block_info.nested_size_ = write_info.size_; - block_info.nested_offset_ = offset_; - offset_ += write_info.size_; + block_info.macro_id_ = macro_id; + block_info.nested_size_ = size; + block_info.nested_offset_ = offset; } return ret; } -int ObSharedMacroBlockMgr::check_write_complete(const MacroBlockId ¯o_id, const int64_t macro_size) +/*static*/int ObSharedMacroBlockMgr::check_write_complete( + const MacroBlockId ¯o_id, + const int64_t offset, + const int64_t size) { int ret = OB_SUCCESS; ObMacroBlockReadInfo read_info; read_info.macro_block_id_ = macro_id; - read_info.size_ = macro_size; - read_info.offset_ = offset_; + read_info.size_ = size; + read_info.offset_ = offset; read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ); read_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS); read_info.io_desc_.set_group_id(ObIOModule::SHARED_MACRO_BLOCK_MGR_IO); ObMacroBlockHandle read_handle; ObSSTableMacroBlockChecker macro_block_checker; - io_allocator_.reuse(); + ObArenaAllocator io_allocator("SMBM_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()); - if (OB_ISNULL(read_info.buf_ = reinterpret_cast(io_allocator_.alloc(read_info.size_)))) { + if (OB_ISNULL(read_info.buf_ = reinterpret_cast(io_allocator.alloc(read_info.size_)))) { ret = OB_ALLOCATE_MEMORY_FAILED; STORAGE_LOG(WARN, "failed to alloc macro read info buffer", K(ret), K(read_info.size_)); } else { @@ -301,15 +307,15 @@ int ObSharedMacroBlockMgr::try_switch_macro_block() int ret = OB_SUCCESS; const MacroBlockId &block_id = macro_handle_.get_macro_id(); const int32_t used_size = offset_; + ObMacroBlockHandle new_macro_handle; // we add_block_size extraly to avoid defragmenting the previous block if some sstables haven't been inited if (block_id.is_valid() && OB_FAIL(add_block(block_id, used_size))) { LOG_WARN("fail to add cur block to map", K(ret), K(block_id)); } else if (FALSE_IT(macro_handle_.reset())) { } else if (FALSE_IT(offset_ = OB_DEFAULT_MACRO_BLOCK_SIZE /* invalid offset */)) { - } else if (OB_FAIL(OB_SERVER_BLOCK_MGR.alloc_block(macro_handle_))) { + } else if (OB_FAIL(OB_SERVER_BLOCK_MGR.alloc_block(new_macro_handle))) { LOG_WARN("fail to alloc block for new macro block", K(ret)); } else { - offset_ = 0; ObMacroBlockWriteInfo write_info; ObBlockInfo block_info; write_info.buffer_ = common_header_buf_; @@ -317,8 +323,15 @@ int ObSharedMacroBlockMgr::try_switch_macro_block() write_info.offset_ = 0; write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE); write_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS); - if (OB_FAIL(do_write_block(write_info, block_info))) { - LOG_WARN("fail to write common header to the shared macro block", K(ret), K(block_info)); + if (OB_FAIL(do_write_block(new_macro_handle.get_macro_id(), write_info, block_info))) { + LOG_WARN("fail to write common header to the shared macro block", K(ret), K(new_macro_handle), K(write_info), K(block_info)); + } + } + if (OB_SUCC(ret)) { + if (OB_FAIL(macro_handle_.set_macro_block_id(new_macro_handle.get_macro_id()))) { + LOG_WARN("fail to set new macro id", K(ret), K(new_macro_handle)); + } else { + offset_ = header_size_; } } diff --git a/src/storage/blocksstable/ob_shared_macro_block_manager.h b/src/storage/blocksstable/ob_shared_macro_block_manager.h index 2d19918ec1..3764b25d51 100644 --- a/src/storage/blocksstable/ob_shared_macro_block_manager.h +++ b/src/storage/blocksstable/ob_shared_macro_block_manager.h @@ -172,8 +172,14 @@ private: ObSSTable &new_sstable) const; int parse_merge_type(const ObSSTable &sstable, compaction::ObMergeType &merge_type) const; int try_switch_macro_block(); - int check_write_complete(const MacroBlockId ¯o_id, const int64_t macro_size); - int do_write_block(const ObMacroBlockWriteInfo &write_info, ObBlockInfo &block_info); + static int check_write_complete( + const MacroBlockId ¯o_id, + const int64_t offset, + const int64_t size); + static int do_write_block( + const MacroBlockId& macro_id, + const ObMacroBlockWriteInfo &write_info, + ObBlockInfo &block_info); DISALLOW_COPY_AND_ASSIGN(ObSharedMacroBlockMgr); private: @@ -192,7 +198,6 @@ private: lib::ObMutex blocks_mutex_; // protect block_used_size_ ObLinearHashMap block_used_size_; ObBlockDefragmentationTask defragmentation_task_; - common::ObArenaAllocator io_allocator_; int tg_id_; bool is_inited_; };