Fix shared block write start from offset 0

This commit is contained in:
JiahuaChen
2024-01-18 08:47:35 +00:00
committed by ob-robot
parent f926b7abf7
commit a9e768cd1d
2 changed files with 48 additions and 30 deletions

View File

@ -79,7 +79,6 @@ ObSharedMacroBlockMgr::ObSharedMacroBlockMgr()
blocks_mutex_(),
block_used_size_(),
defragmentation_task_(*this),
io_allocator_("SMBM_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()),
tg_id_(-1),
is_inited_(false)
{
@ -211,9 +210,14 @@ int ObSharedMacroBlockMgr::write_block(
if (OB_SUCC(ret)) {
write_info.offset_ = offset_;
write_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS);
if (OB_FAIL(do_write_block(write_info, block_info))) {
if (OB_FAIL(do_write_block(macro_handle_.get_macro_id(), write_info, block_info))) {
LOG_WARN("fail to do write block", K(ret), K(write_info), K(block_info));
} else {
}
// no matter success or failure, advance offset_
offset_ += size;
if (OB_SUCC(ret)) {
FLOG_INFO("successfully write small sstable",
K(ret), K(block_info), K(offset_), "old_block", write_ctx.get_macro_block_list());
write_ctx.reset();
@ -228,57 +232,59 @@ int ObSharedMacroBlockMgr::write_block(
return ret;
}
int ObSharedMacroBlockMgr::do_write_block(
/*static*/int ObSharedMacroBlockMgr::do_write_block(
const MacroBlockId& macro_id,
const ObMacroBlockWriteInfo &write_info,
ObBlockInfo &block_info)
{
int ret = OB_SUCCESS;
ObMacroBlockHandle write_macro_handle;
const int64_t offset = write_info.offset_;
const int64_t size = write_info.size_;
if (OB_FAIL(write_macro_handle.set_macro_block_id(macro_handle_.get_macro_id()))) {
LOG_WARN("fail to set macro block id", K(ret), K(macro_handle_.get_macro_id()));
if (OB_FAIL(write_macro_handle.set_macro_block_id(macro_id))) {
LOG_WARN("fail to set macro block id", K(ret), K(macro_id));
} else if (OB_FAIL(write_macro_handle.async_write(write_info))) {
LOG_WARN("fail to async write virtual macro block", K(ret), K(write_macro_handle));
} else if (OB_FAIL(write_macro_handle.wait())) {
LOG_WARN("fail to wait previous io", K(ret), K(write_info));
}
if (OB_TIMEOUT == ret) {
// although ret is timeout, file system may be still writing, so we should skip the offset
offset_ += write_info.size_;
}
if (OB_SUCC(ret) && !write_macro_handle.is_empty() && MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE ==
GCONF.micro_block_merge_verify_level && 0 != offset_) {
if (OB_FAIL(check_write_complete(write_macro_handle.get_macro_id(), write_info.size_))) {
if (OB_SUCC(ret)
&& !write_macro_handle.is_empty()
&& MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE == GCONF.micro_block_merge_verify_level
&& 0 != offset) {
if (OB_FAIL(check_write_complete(write_macro_handle.get_macro_id(), offset, size))) {
LOG_WARN("fail to check write completion", K(ret));
}
}
if (OB_SUCC(ret)) {
block_info.macro_id_ = write_macro_handle.get_macro_id();
block_info.nested_size_ = write_info.size_;
block_info.nested_offset_ = offset_;
offset_ += write_info.size_;
block_info.macro_id_ = macro_id;
block_info.nested_size_ = size;
block_info.nested_offset_ = offset;
}
return ret;
}
int ObSharedMacroBlockMgr::check_write_complete(const MacroBlockId &macro_id, const int64_t macro_size)
/*static*/int ObSharedMacroBlockMgr::check_write_complete(
const MacroBlockId &macro_id,
const int64_t offset,
const int64_t size)
{
int ret = OB_SUCCESS;
ObMacroBlockReadInfo read_info;
read_info.macro_block_id_ = macro_id;
read_info.size_ = macro_size;
read_info.offset_ = offset_;
read_info.size_ = size;
read_info.offset_ = offset;
read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ);
read_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS);
read_info.io_desc_.set_group_id(ObIOModule::SHARED_MACRO_BLOCK_MGR_IO);
ObMacroBlockHandle read_handle;
ObSSTableMacroBlockChecker macro_block_checker;
io_allocator_.reuse();
ObArenaAllocator io_allocator("SMBM_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID());
if (OB_ISNULL(read_info.buf_ = reinterpret_cast<char*>(io_allocator_.alloc(read_info.size_)))) {
if (OB_ISNULL(read_info.buf_ = reinterpret_cast<char*>(io_allocator.alloc(read_info.size_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "failed to alloc macro read info buffer", K(ret), K(read_info.size_));
} else {
@ -301,15 +307,15 @@ int ObSharedMacroBlockMgr::try_switch_macro_block()
int ret = OB_SUCCESS;
const MacroBlockId &block_id = macro_handle_.get_macro_id();
const int32_t used_size = offset_;
ObMacroBlockHandle new_macro_handle;
// we add_block_size extraly to avoid defragmenting the previous block if some sstables haven't been inited
if (block_id.is_valid() && OB_FAIL(add_block(block_id, used_size))) {
LOG_WARN("fail to add cur block to map", K(ret), K(block_id));
} else if (FALSE_IT(macro_handle_.reset())) {
} else if (FALSE_IT(offset_ = OB_DEFAULT_MACRO_BLOCK_SIZE /* invalid offset */)) {
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.alloc_block(macro_handle_))) {
} else if (OB_FAIL(OB_SERVER_BLOCK_MGR.alloc_block(new_macro_handle))) {
LOG_WARN("fail to alloc block for new macro block", K(ret));
} else {
offset_ = 0;
ObMacroBlockWriteInfo write_info;
ObBlockInfo block_info;
write_info.buffer_ = common_header_buf_;
@ -317,8 +323,15 @@ int ObSharedMacroBlockMgr::try_switch_macro_block()
write_info.offset_ = 0;
write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE);
write_info.io_timeout_ms_ = std::max(GCONF._data_storage_io_timeout / 1000, DEFAULT_IO_WAIT_TIME_MS);
if (OB_FAIL(do_write_block(write_info, block_info))) {
LOG_WARN("fail to write common header to the shared macro block", K(ret), K(block_info));
if (OB_FAIL(do_write_block(new_macro_handle.get_macro_id(), write_info, block_info))) {
LOG_WARN("fail to write common header to the shared macro block", K(ret), K(new_macro_handle), K(write_info), K(block_info));
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(macro_handle_.set_macro_block_id(new_macro_handle.get_macro_id()))) {
LOG_WARN("fail to set new macro id", K(ret), K(new_macro_handle));
} else {
offset_ = header_size_;
}
}

View File

@ -172,8 +172,14 @@ private:
ObSSTable &new_sstable) const;
int parse_merge_type(const ObSSTable &sstable, compaction::ObMergeType &merge_type) const;
int try_switch_macro_block();
int check_write_complete(const MacroBlockId &macro_id, const int64_t macro_size);
int do_write_block(const ObMacroBlockWriteInfo &write_info, ObBlockInfo &block_info);
static int check_write_complete(
const MacroBlockId &macro_id,
const int64_t offset,
const int64_t size);
static int do_write_block(
const MacroBlockId& macro_id,
const ObMacroBlockWriteInfo &write_info,
ObBlockInfo &block_info);
DISALLOW_COPY_AND_ASSIGN(ObSharedMacroBlockMgr);
private:
@ -192,7 +198,6 @@ private:
lib::ObMutex blocks_mutex_; // protect block_used_size_
ObLinearHashMap<MacroBlockId, int32_t> block_used_size_;
ObBlockDefragmentationTask defragmentation_task_;
common::ObArenaAllocator io_allocator_;
int tg_id_;
bool is_inited_;
};