diff --git a/src/storage/blocksstable/ob_bloom_filter_data_writer.cpp b/src/storage/blocksstable/ob_bloom_filter_data_writer.cpp index 4c026de935..04c3261bce 100644 --- a/src/storage/blocksstable/ob_bloom_filter_data_writer.cpp +++ b/src/storage/blocksstable/ob_bloom_filter_data_writer.cpp @@ -345,8 +345,7 @@ int ObBloomFilterMacroBlockWriter::flush_macro_block() ObMacroBlockHandle macro_handle; ObMacroBlockWriteInfo macro_write_info; macro_write_info.buffer_ = data_buffer_.data(); - macro_write_info.size_ = data_buffer_.capacity(); - macro_write_info.size_ = OB_SERVER_BLOCK_MGR.get_macro_block_size(); + macro_write_info.size_ = data_buffer_.upper_align_length(); macro_write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE); macro_write_info.io_desc_.set_group_id(ObIOModule::BLOOM_FILTER_IO); if (OB_FAIL(ObBlockManager::write_block(macro_write_info, macro_handle))) { diff --git a/src/storage/blocksstable/ob_data_buffer.h b/src/storage/blocksstable/ob_data_buffer.h index 1949cc8f4a..d46822b04a 100644 --- a/src/storage/blocksstable/ob_data_buffer.h +++ b/src/storage/blocksstable/ob_data_buffer.h @@ -59,6 +59,11 @@ public: return pos_; } + inline int64_t upper_align_length() const + { + return upper_align(pos_, DIO_ALIGN_SIZE); + } + inline int64_t remain() const { return capacity_ - pos_; diff --git a/src/storage/blocksstable/ob_imacro_block_flush_callback.h b/src/storage/blocksstable/ob_imacro_block_flush_callback.h index c05b7bf67a..b026754570 100644 --- a/src/storage/blocksstable/ob_imacro_block_flush_callback.h +++ b/src/storage/blocksstable/ob_imacro_block_flush_callback.h @@ -30,6 +30,7 @@ public: virtual int write(const ObMacroBlockHandle ¯o_handle, const ObLogicMacroBlockId &logic_id, char *buf, + const int64_t buf_len, const int64_t data_seq) = 0; virtual int wait() = 0; }; diff --git a/src/storage/blocksstable/ob_macro_block.cpp b/src/storage/blocksstable/ob_macro_block.cpp index 71c29bef8b..40dc4e5dbe 100644 --- a/src/storage/blocksstable/ob_macro_block.cpp +++ b/src/storage/blocksstable/ob_macro_block.cpp @@ -843,7 +843,7 @@ int ObMacroBlock::flush(ObMacroBlockHandle ¯o_handle, } else { ObMacroBlockWriteInfo write_info; write_info.buffer_ = data_.data(); - write_info.size_ = data_.capacity(); + write_info.size_ = data_.upper_align_length(); write_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_WRITE); if (OB_FAIL(macro_handle.async_write(write_info))) { STORAGE_LOG(WARN, "Fail to async write block", K(ret), K(macro_handle)); diff --git a/src/storage/blocksstable/ob_macro_block_writer.cpp b/src/storage/blocksstable/ob_macro_block_writer.cpp index a1426ef61a..83cec0cc84 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.cpp +++ b/src/storage/blocksstable/ob_macro_block_writer.cpp @@ -1246,6 +1246,7 @@ int ObMacroBlockWriter::flush_macro_block(ObMacroBlock ¯o_block) } else if (OB_NOT_NULL(callback_) && OB_FAIL(callback_->write(macro_handle, cur_logic_id, macro_block.get_data_buf(), + upper_align(macro_block.get_data_size(), DIO_ALIGN_SIZE), current_macro_seq_))) { STORAGE_LOG(WARN, "fail to do callback flush", K(ret)); } diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.cpp b/src/storage/ddl/ob_ddl_redo_log_writer.cpp index 7c979351bd..c5b829ad5e 100755 --- a/src/storage/ddl/ob_ddl_redo_log_writer.cpp +++ b/src/storage/ddl/ob_ddl_redo_log_writer.cpp @@ -1574,6 +1574,7 @@ int ObDDLRedoLogWriterCallback::init(const ObDDLMacroBlockType block_type, int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle, const ObLogicMacroBlockId &logic_id, char *buf, + const int64_t buf_len, const int64_t data_seq) { int ret = OB_SUCCESS; @@ -1585,7 +1586,7 @@ int ObDDLRedoLogWriterCallback::write(const ObMacroBlockHandle ¯o_handle, } else { macro_block_id_ = macro_handle.get_macro_id(); redo_info_.table_key_ = table_key_; - redo_info_.data_buffer_.assign(buf, OB_SERVER_BLOCK_MGR.get_macro_block_size()); + redo_info_.data_buffer_.assign(buf, buf_len); redo_info_.block_type_ = block_type_; redo_info_.logic_id_ = logic_id; redo_info_.start_scn_ = ddl_writer_->get_start_scn(); diff --git a/src/storage/ddl/ob_ddl_redo_log_writer.h b/src/storage/ddl/ob_ddl_redo_log_writer.h index 9013a538f9..cc16dfdc08 100644 --- a/src/storage/ddl/ob_ddl_redo_log_writer.h +++ b/src/storage/ddl/ob_ddl_redo_log_writer.h @@ -343,6 +343,7 @@ public: const ObMacroBlockHandle ¯o_handle, const blocksstable::ObLogicMacroBlockId &logic_id, char *buf, + const int64_t buf_len, const int64_t data_seq); int wait(); int prepare_block_buffer_if_need(); diff --git a/src/storage/high_availability/ob_storage_ha_macro_block_writer.cpp b/src/storage/high_availability/ob_storage_ha_macro_block_writer.cpp index d2d3fe113b..bd1eae3945 100644 --- a/src/storage/high_availability/ob_storage_ha_macro_block_writer.cpp +++ b/src/storage/high_availability/ob_storage_ha_macro_block_writer.cpp @@ -153,7 +153,7 @@ int ObStorageHAMacroBlockWriter::process(blocksstable::ObMacroBlocksWriteCtx &co LOG_WARN("header is reuse macro block", K(ret)); } else { write_info.buffer_ = data.data(); - write_info.size_ = data.capacity(); + write_info.size_ = data.upper_align_length(); write_handle.reset(); if (OB_FAIL(ObBlockManager::async_write_block(write_info, write_handle))) { STORAGE_LOG(WARN, "fail to async write block", K(ret), K(write_info), K(write_handle)); diff --git a/src/storage/slog_ckpt/ob_linked_macro_block_writer.cpp b/src/storage/slog_ckpt/ob_linked_macro_block_writer.cpp index 2aba55a8b5..a967e756bb 100644 --- a/src/storage/slog_ckpt/ob_linked_macro_block_writer.cpp +++ b/src/storage/slog_ckpt/ob_linked_macro_block_writer.cpp @@ -51,45 +51,39 @@ int ObLinkedMacroBlockWriter::write_block(const char *buf, const int64_t buf_len if (OB_UNLIKELY(!is_inited_)) { ret = OB_NOT_INIT; LOG_WARN("ObLinkedMacroBlockWriter has not been inited", K(ret)); - } else if (OB_UNLIKELY(nullptr == buf)) { + } else if (OB_UNLIKELY(nullptr == buf || buf_len < 0)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), KP(buf)); + LOG_WARN("invalid argument", K(ret), KP(buf), K(buf_len)); } else { - const int64_t macro_block_size = OB_SERVER_BLOCK_MGR.get_macro_block_size(); - if (buf_len != macro_block_size) { - ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(macro_block_size), K(buf_len)); - } else { - ObMacroBlockWriteInfo write_info; - write_info.size_ = macro_block_size; - write_info.io_desc_ = io_desc_; - write_info.buffer_ = buf; - write_info.io_desc_.set_group_id(ObIOModule::LINKED_MACRO_BLOCK_IO); - MacroBlockId previous_block_id; - previous_block_id.set_block_index(MacroBlockId::EMPTY_ENTRY_BLOCK_INDEX); - if (!handle_.is_empty()) { - const int64_t io_timeout_ms = GCONF._data_storage_io_timeout / 1000L; - if (OB_FAIL(handle_.wait(io_timeout_ms))) { - LOG_WARN("fail to wait io finish", K(ret)); - } else { - previous_block_id = handle_.get_macro_id(); - pre_block_id = previous_block_id; - } + ObMacroBlockWriteInfo write_info; + write_info.size_ = buf_len; + write_info.io_desc_ = io_desc_; + write_info.buffer_ = buf; + write_info.io_desc_.set_group_id(ObIOModule::LINKED_MACRO_BLOCK_IO); + MacroBlockId previous_block_id; + previous_block_id.set_block_index(MacroBlockId::EMPTY_ENTRY_BLOCK_INDEX); + if (!handle_.is_empty()) { + const int64_t io_timeout_ms = GCONF._data_storage_io_timeout / 1000L; + if (OB_FAIL(handle_.wait(io_timeout_ms))) { + LOG_WARN("fail to wait io finish", K(ret)); + } else { + previous_block_id = handle_.get_macro_id(); + pre_block_id = previous_block_id; } + } - if (OB_SUCC(ret)) { - linked_header.set_previous_block_id(previous_block_id); - common_header.set_payload_checksum(static_cast( - ob_crc64(buf + common_header.get_serialize_size(), common_header.get_payload_size()))); - } + if (OB_SUCC(ret)) { + linked_header.set_previous_block_id(previous_block_id); + common_header.set_payload_checksum(static_cast( + ob_crc64(buf + common_header.get_serialize_size(), common_header.get_payload_size()))); + } - if (OB_SUCC(ret)) { - handle_.reset(); - if (OB_FAIL(ObBlockManager::async_write_block(write_info, handle_))) { - LOG_WARN("fail to async write block", K(ret), K(write_info), K(handle_)); - } else if (OB_FAIL(write_ctx_.add_macro_block_id(handle_.get_macro_id()))) { - LOG_WARN("fail to add macro id", K(ret), "macro id", handle_.get_macro_id()); - } + if (OB_SUCC(ret)) { + handle_.reset(); + if (OB_FAIL(ObBlockManager::async_write_block(write_info, handle_))) { + LOG_WARN("fail to async write block", K(ret), K(write_info), K(handle_)); + } else if (OB_FAIL(write_ctx_.add_macro_block_id(handle_.get_macro_id()))) { + LOG_WARN("fail to add macro id", K(ret), "macro id", handle_.get_macro_id()); } } } @@ -371,8 +365,9 @@ int ObLinkedMacroBlockItemWriter::write_block() common_header_->set_payload_size( static_cast(io_buf_pos_ - common_header_->get_serialize_size())); MacroBlockId pre_block_id; + const int64_t upper_align_size = upper_align(io_buf_pos_, DIO_ALIGN_SIZE); if (OB_FAIL(block_writer_.write_block( - io_buf_, io_buf_size_, *common_header_, *linked_header_, pre_block_id))) { + io_buf_, upper_align_size, *common_header_, *linked_header_, pre_block_id))) { LOG_WARN("fail to write block", K(ret)); } else if (OB_FAIL(set_pre_block_inflight_items_addr(pre_block_id))) { LOG_WARN("fail to set pre block inflight items addr", K(ret));