From a2e86378c6fc82d57b6c5fa1603f5cae07d9f9d9 Mon Sep 17 00:00:00 2001 From: hiddenbomb Date: Tue, 15 Jun 2021 14:51:46 +0800 Subject: [PATCH] adapt to 4k align size for slog and clog (#94) --- src/clog/ob_clog_file_writer.cpp | 48 +++++++++++-------- src/clog/ob_clog_file_writer.h | 2 +- src/clog/ob_log_define.h | 7 ++- src/clog/ob_log_direct_reader.cpp | 18 +++++-- .../slog/ob_storage_log_reader.cpp | 20 +++++--- .../blocksstable/slog/ob_storage_log_writer.h | 5 +- unittest/storage/blocksstable/CMakeLists.txt | 1 + .../slog/test_storage_log_reader_writer.cpp | 23 ++++----- 8 files changed, 77 insertions(+), 47 deletions(-) diff --git a/src/clog/ob_clog_file_writer.cpp b/src/clog/ob_clog_file_writer.cpp index 898bb0fd0..7b9411b8a 100644 --- a/src/clog/ob_clog_file_writer.cpp +++ b/src/clog/ob_clog_file_writer.cpp @@ -255,16 +255,20 @@ int ObCLogBaseFileWriter::append_trailer_entry(const uint32_t info_block_offset) ObLogFileTrailer trailer; int64_t pos = 0; const file_id_t phy_file_id = file_id_ + 1; - char* buf = shm_data_buf_; + // build trailer from last 512 byte offset (4096-512) + int64_t trailer_pos = CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; + char *buf = shm_data_buf_ + trailer_pos; reset_buf(); if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret)); + CLOG_LOG(WARN, "file_offset_ mismatch trailer offset", K(ret), K_(file_offset), + LITERAL_K(CLOG_TRAILER_OFFSET)); } else if (OB_FAIL(trailer.build_serialized_trailer(buf, CLOG_TRAILER_SIZE, info_block_offset, phy_file_id, pos))) { - CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), K(info_block_offset), K_(file_id), K(phy_file_id)); + CLOG_LOG(WARN, "build_serialized_trailer fail", K(ret), LITERAL_K(CLOG_DIO_ALIGN_SIZE), + K(info_block_offset), K_(file_id), K(phy_file_id)); } else { - buf_write_pos_ += (uint32_t)CLOG_TRAILER_SIZE; + buf_write_pos_ += (uint32_t)CLOG_DIO_ALIGN_SIZE; } return ret; @@ -275,12 +279,13 @@ int ObCLogBaseFileWriter::flush_trailer_entry() int ret = OB_SUCCESS; if (CLOG_TRAILER_OFFSET != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), "CLOG_TRAILER_OFFSET", CLOG_TRAILER_OFFSET); - } else if (CLOG_TRAILER_SIZE != buf_write_pos_) { + CLOG_LOG(WARN, "file offset mismatch", K_(file_offset), LITERAL_K(CLOG_TRAILER_OFFSET)); + } else if (CLOG_DIO_ALIGN_SIZE != buf_write_pos_) { ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), "CLOG_TRAILER_SIZE", CLOG_TRAILER_SIZE); - } else if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, file_offset_))) { - CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset), K(errno)); + CLOG_LOG(WARN, "buf write position mismatch", K_(buf_write_pos), LITERAL_K(CLOG_DIO_ALIGN_SIZE)); + } else if (OB_FAIL(store_->write(shm_data_buf_, buf_write_pos_, CLOG_TRAILER_ALIGN_WRITE_OFFSET))) { + CLOG_LOG(ERROR, "write fail", K(ret), K(buf_write_pos_), K_(file_offset), + LITERAL_K(CLOG_TRAILER_ALIGN_WRITE_OFFSET), K(errno)); } return ret; } @@ -381,16 +386,18 @@ int ObCLogBaseFileWriter::append_padding_entry(const uint32_t padding_size) return ret; } -int ObCLogBaseFileWriter::cache_buf(ObLogCache* log_cache) +int ObCLogBaseFileWriter::cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len) { int ret = OB_SUCCESS; - char* buf = shm_data_buf_; - if (buf_write_pos_ > 0) { + if (OB_ISNULL(buf) || 0 == buf_len) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid args", K(ret), KP(buf), K(buf_len)); + } else { const common::ObAddr addr = GCTX.self_addr_; - if (OB_FAIL(log_cache->append_data(addr, buf, file_id_, file_offset_, buf_write_pos_))) { - CLOG_LOG(WARN, "fail to cache buf, ", K(ret), K_(file_id), K_(file_offset), K_(buf_write_pos)); + if (OB_FAIL(log_cache->append_data(addr, buf, file_id_, file_offset_, buf_len))) { + CLOG_LOG(WARN, "fail to cache buf, ", K(ret), K_(file_id), K_(file_offset), K(buf_len)); } else { - file_offset_ += buf_write_pos_; + file_offset_ += buf_len; } } return ret; @@ -659,7 +666,7 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob CLOG_LOG(WARN, "fail to add info block", K(ret), K(info_getter)); } else if (OB_FAIL(flush_buf())) { CLOG_LOG(WARN, "fail to flush info block", K(ret)); - } else if (OB_FAIL(cache_buf(log_cache))) { + } else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) { CLOG_LOG(WARN, "fail to cache info block", K(ret)); } } @@ -673,16 +680,17 @@ int ObCLogLocalFileWriter::end_current_file(ObIInfoBlockHandler* info_getter, Ob // - Flush trailer entry to log file // - Cache trailer entry to log cache + char *trailer_buf = shm_data_buf_ + CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE; if (OB_SUCC(ret)) { if (OB_FAIL(append_trailer_entry(info_block_offset))) { CLOG_LOG(WARN, "fail to add trailer", K(ret)); } else if (OB_FAIL(flush_trailer_entry())) { CLOG_LOG(WARN, "fail to flush trailer", K(ret)); - } else if (OB_FAIL(cache_buf(log_cache))) { - CLOG_LOG(WARN, "fail to cache trailer", K(ret)); + } else if (OB_FAIL(cache_buf(log_cache, trailer_buf, CLOG_TRAILER_SIZE))) { + CLOG_LOG(WARN, "fail to cache trailer", K(ret), KP(trailer_buf), LITERAL_K(CLOG_TRAILER_SIZE)); } else if (CLOG_FILE_SIZE != file_offset_) { // Defense code ret = OB_ERR_UNEXPECTED; - CLOG_LOG(WARN, "file_offset_ mismatch file size", K(ret)); + CLOG_LOG(WARN, "file_offset_ mismatch file size", K(ret), K_(file_offset)); } else { tail->advance(file_id_ + 1, 0); reset_buf(); @@ -713,7 +721,7 @@ int ObCLogLocalFileWriter::cache_last_padding_entry(ObLogCache* log_cache) padding_size = ObPaddingEntry::get_padding_size(file_offset_, align_size_); if (OB_FAIL(append_padding_entry(padding_size))) { CLOG_LOG(WARN, "inner add padding entry error", K(ret), K(padding_size)); - } else if (OB_FAIL(cache_buf(log_cache))) { + } else if (OB_FAIL(cache_buf(log_cache, shm_data_buf_, buf_write_pos_))) { CLOG_LOG(WARN, "fail to cache last padding", K(ret)); } } diff --git a/src/clog/ob_clog_file_writer.h b/src/clog/ob_clog_file_writer.h index 650a4743c..fc00d7370 100644 --- a/src/clog/ob_clog_file_writer.h +++ b/src/clog/ob_clog_file_writer.h @@ -96,7 +96,7 @@ class ObCLogBaseFileWriter { int append_trailer_entry(const uint32_t info_block_offset); int flush_trailer_entry(); // append all data in buffer to log cache - int cache_buf(ObLogCache* log_cache); + int cache_buf(ObLogCache *log_cache, const char *buf, const uint32_t buf_len); OB_INLINE bool need_align() const { diff --git a/src/clog/ob_log_define.h b/src/clog/ob_log_define.h index 9e0b41af0..948ed2d2e 100644 --- a/src/clog/ob_log_define.h +++ b/src/clog/ob_log_define.h @@ -29,19 +29,22 @@ namespace oceanbase { namespace clog { +#define CLOG_DIO_ALIGN_SIZE 4096 +#define TMP_SUFFIX ".tmp" + typedef uint32_t file_id_t; typedef int32_t offset_t; const int64_t CLOG_RPC_TIMEOUT = 3000 * 1000 - 100 * 1000; const int64_t CLOG_TRAILER_SIZE = 512; const int64_t CLOG_TRAILER_OFFSET = CLOG_FILE_SIZE - CLOG_TRAILER_SIZE; // 512B for the trailer block +const int64_t CLOG_TRAILER_ALIGN_WRITE_OFFSET = CLOG_FILE_SIZE - + CLOG_DIO_ALIGN_SIZE; // 4k aligned write const int64_t CLOG_MAX_DATA_OFFSET = CLOG_TRAILER_OFFSET - common::OB_MAX_LOG_BUFFER_SIZE; const int64_t CLOG_CACHE_SIZE = 64 * 1024; const int64_t CLOG_REPLAY_CHECKSUM_WINDOW_SIZE = 1 << 9; const int64_t CLOG_INFO_BLOCK_SIZE_LIMIT = 1 << 22; const offset_t OB_INVALID_OFFSET = -1; -#define CLOG_DIO_ALIGN_SIZE 4096 -#define TMP_SUFFIX ".tmp" inline bool is_valid_log_id(const uint64_t log_id) { diff --git a/src/clog/ob_log_direct_reader.cpp b/src/clog/ob_log_direct_reader.cpp index 58a923bc4..92b22ff6e 100644 --- a/src/clog/ob_log_direct_reader.cpp +++ b/src/clog/ob_log_direct_reader.cpp @@ -1073,10 +1073,13 @@ int ObLogDirectReader::read_trailer( ObReadRes res; ObReadParam trailer_param; trailer_param.file_id_ = param.file_id_; - trailer_param.offset_ = CLOG_TRAILER_OFFSET; - trailer_param.read_len_ = CLOG_TRAILER_SIZE; + trailer_param.offset_ = CLOG_TRAILER_ALIGN_WRITE_OFFSET; // 4k aligned write, but data is in last 512bytes + trailer_param.read_len_ = CLOG_DIO_ALIGN_SIZE; trailer_param.timeout_ = param.timeout_; + const char *trailer_buf = NULL; + int64_t trailer_len = 0; + // always read trailed from disk, handling error code specially if (OB_SUCCESS != (ret = read_data_direct_impl(trailer_param, rbuf, res, cost))) { if (OB_READ_NOTHING == ret) { @@ -1084,8 +1087,15 @@ int ObLogDirectReader::read_trailer( } else { CLOG_LOG(WARN, "read trailer data error", K(ret), K(trailer_param)); } - } else if (OB_FAIL(trailer.deserialize(res.buf_, res.data_len_, pos))) { - CLOG_LOG(WARN, "trailer deserialize fail", K(ret), K(res), K(pos)); + } else { + trailer_buf = res.buf_ + (CLOG_DIO_ALIGN_SIZE - CLOG_TRAILER_SIZE); + trailer_len = CLOG_TRAILER_SIZE; + } + + if (OB_FAIL(ret)) { + CLOG_LOG(WARN, "fail to read trailer data", K(ret)); + } else if (OB_FAIL(trailer.deserialize(trailer_buf, trailer_len, pos))) { + CLOG_LOG(WARN, "trailer deserialize fail", K(ret), KP(trailer_buf), K(trailer_len), K(res), K(pos)); } else if (OB_UNLIKELY(trailer.get_file_id() != trailer_param.file_id_ + 1)) { ret = OB_INVALID_DATA; CLOG_LOG(WARN, diff --git a/src/storage/blocksstable/slog/ob_storage_log_reader.cpp b/src/storage/blocksstable/slog/ob_storage_log_reader.cpp index 34622eb88..5030eab69 100644 --- a/src/storage/blocksstable/slog/ob_storage_log_reader.cpp +++ b/src/storage/blocksstable/slog/ob_storage_log_reader.cpp @@ -54,8 +54,7 @@ int ObStorageLogReader::init(const char* log_dir, const uint64_t log_file_id_sta if (OB_SUCC(ret)) { if (NULL == log_buffer_.get_data()) { ObMemAttr attr(OB_SERVER_TENANT_ID, ObModIds::OB_LOG_READER); - char* buf = - static_cast(ob_malloc_align(OB_DIRECT_IO_ALIGN, ObStorageLogWriter::LOG_ITEM_MAX_LENGTH, attr)); + char *buf = static_cast(ob_malloc_align(DIO_READ_ALIGN_SIZE, ObStorageLogWriter::LOG_ITEM_MAX_LENGTH, attr)); if (OB_ISNULL(buf)) { ret = OB_ERROR; STORAGE_REDO_LOG(WARN, "ob_malloc for log_buffer_ failed", K(ret)); @@ -391,10 +390,18 @@ int ObStorageLogReader::get_next_cursor(common::ObLogCursor& cursor) const int ObStorageLogReader::load_buf() { int ret = OB_SUCCESS; - if ((0 != log_buffer_.get_capacity() % DIO_ALIGN_SIZE) || (0 != pread_pos_ % DIO_ALIGN_SIZE) || - (log_buffer_.get_remain_data_len() < 0) || (log_buffer_.get_remain_data_len() > pread_pos_)) { // Defense code + if ((0 != log_buffer_.get_capacity() % DIO_READ_ALIGN_SIZE) + || (log_buffer_.get_remain_data_len() < 0) + || (log_buffer_.get_remain_data_len() > pread_pos_)) { // Defense code ret = OB_LOG_NOT_ALIGN; STORAGE_REDO_LOG(WARN, "buf or read pos are not aligned", K(ret), K_(log_buffer), K_(pread_pos)); + } else if (0 != pread_pos_ % DIO_READ_ALIGN_SIZE) { + // pread_pos_ should be 4k aligned because file handler returned read size is always 4k aligned, + // if pread_pos_ is not aligned, it means file reaches end and file size is not 4k aligned, + // then we have no need to load buf again + ret = OB_READ_NOTHING; + STORAGE_REDO_LOG(INFO, "pread_pos_ reaches the end of file, and file size is not 4k aligned", + K(ret), K_(pread_pos)); } else if (log_buffer_.get_remain_data_len() == log_buffer_.get_capacity()) { // do nothing if buf hasn't been consumed STORAGE_REDO_LOG(WARN, "buf remains same", K(ret), K_(log_buffer), K_(pread_pos)); @@ -404,9 +411,10 @@ int ObStorageLogReader::load_buf() // Move the next log entry to the beginning of the buffer so that need to adjust pread_pos_ // back to align the DIO read. - pread_pos_ = lower_align(pread_pos_ - remain_size, OB_DIRECT_IO_ALIGN); + pread_pos_ = lower_align(pread_pos_ - remain_size, DIO_READ_ALIGN_SIZE); log_buffer_.get_limit() = 0; - log_buffer_.get_position() = (0 == remain_size) ? 0 : upper_align(remain_size, OB_DIRECT_IO_ALIGN) - remain_size; + log_buffer_.get_position() = (0 == remain_size) ? 0 + : upper_align(remain_size, DIO_READ_ALIGN_SIZE) - remain_size; if (OB_FAIL(file_store_->read(log_buffer_.get_data(), log_buffer_.get_capacity(), pread_pos_, read_size))) { STORAGE_REDO_LOG(ERROR, diff --git a/src/storage/blocksstable/slog/ob_storage_log_writer.h b/src/storage/blocksstable/slog/ob_storage_log_writer.h index 59fa9ab0a..7c6fb0197 100644 --- a/src/storage/blocksstable/slog/ob_storage_log_writer.h +++ b/src/storage/blocksstable/slog/ob_storage_log_writer.h @@ -75,9 +75,9 @@ class ObStorageLogItem : public common::ObIBaseLogItem { class ObStorageLogWriter : public common::ObBaseLogWriter { public: - static const int64_t LOG_FILE_ALIGN_SIZE = 1 << common::OB_DIRECT_IO_ALIGN_BITS; + static const int64_t LOG_FILE_ALIGN_SIZE = 4 * 1024; // 4KB static const int64_t LOG_BUF_RESERVED_SIZE = 3 * LOG_FILE_ALIGN_SIZE; // NOP + switch_log - static const int64_t LOG_ITEM_MAX_LENGTH = 32 << 20; // 32MB + static const int64_t LOG_ITEM_MAX_LENGTH = 32 << 20; // 32MB ObStorageLogWriter(); virtual ~ObStorageLogWriter(); @@ -194,7 +194,6 @@ class ObStorageLogWriter : public common::ObBaseLogWriter { common::ObIBaseLogItem** items, const int64_t item_cnt, int64_t& sync_idx, const int64_t cur_idx); int aggregate_logs_to_buffer(common::ObIBaseLogItem** items, const int64_t item_cnt, const int64_t sync_idx, const int64_t cur_idx, char*& write_buf, int64_t& write_len); - ; int advance_log_items(common::ObIBaseLogItem** items, const int64_t item_cnt, const int64_t cur_idx); int advance_single_item(const int64_t cur_file_id, ObStorageLogItem& log_item); diff --git a/unittest/storage/blocksstable/CMakeLists.txt b/unittest/storage/blocksstable/CMakeLists.txt index 183eac5fa..893caad9a 100644 --- a/unittest/storage/blocksstable/CMakeLists.txt +++ b/unittest/storage/blocksstable/CMakeLists.txt @@ -18,3 +18,4 @@ storage_unittest(test_bloom_filter_data) storage_unittest(test_micro_block_index_cache) storage_unittest(test_ref_cnt) storage_unittest(test_macro_block_id) +storage_unittest(test_storage_log_reader_writer slog/test_storage_log_reader_writer.cpp) diff --git a/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp b/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp index bcfef373b..f1fe06a70 100644 --- a/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp +++ b/unittest/storage/blocksstable/slog/test_storage_log_reader_writer.cpp @@ -368,7 +368,7 @@ TEST_F(TestStorageLogReaderWriter, large_item_batch_write) { int ret = OB_SUCCESS; const char LOG_DIR[512] = "./test_storage_log_rw"; - const int64_t LOG_FILE_SIZE = 4 << 10; // 4K + const int64_t LOG_FILE_SIZE = 12 * 1024; // 12K const int64_t CONCURRENT_TRANS_CNT = 128; const int64_t LOG_BUFFER_SIZE = 512 * 1024; // 512K @@ -451,12 +451,13 @@ TEST_F(TestStorageLogReaderWriter, revise) start_cursor.log_id_ = 1; start_cursor.offset_ = 0; - char write_data[800]; - MEMSET(write_data, 1, 800); + const int data_size = 5000; + char write_data[data_size]; + MEMSET(write_data, 1, data_size); ObBaseStorageLogBuffer log_buf; - ret = log_buf.assign(write_data, 800); + ret = log_buf.assign(write_data, data_size); ASSERT_EQ(OB_SUCCESS, ret); - ret = log_buf.set_pos(800); + ret = log_buf.set_pos(data_size); ASSERT_EQ(OB_SUCCESS, ret); ObStorageLogWriter writer; @@ -465,7 +466,7 @@ TEST_F(TestStorageLogReaderWriter, revise) ret = writer.start_log(start_cursor); ASSERT_EQ(OB_SUCCESS, ret); - // write 3 logs so that valid data length is 3K + // write 3 logs so that valid data length is 4K * 3 = 12288 for (int64_t i = 0; i < 3; ++i) { start_cursor.reset(); ret = writer.flush_log(LogCommand::OB_LOG_DUMMY_LOG, log_buf, start_cursor); @@ -476,7 +477,7 @@ TEST_F(TestStorageLogReaderWriter, revise) } // truncate the file so that last log is incomplete - ASSERT_TRUE(0 == ::truncate("./test_storage_log_rw/1", 2560)); + ASSERT_TRUE(0 == ::truncate("./test_storage_log_rw/1", 20480)); // revise log ObStorageLogReader reader; @@ -489,7 +490,7 @@ TEST_F(TestStorageLogReaderWriter, revise) int64_t revise_size = 0; ret = FileDirectoryUtils::get_file_size("./test_storage_log_rw/1", revise_size); ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(2048, revise_size); + ASSERT_EQ(16384, revise_size); } // the last log file has switch file entry at the end @@ -498,7 +499,7 @@ TEST_F(TestStorageLogReaderWriter, switch_file_revise) { int ret = OB_SUCCESS; const char LOG_DIR[512] = "./test_storage_log_rw"; - const int64_t LOG_FILE_SIZE = 2048; // 2KB + const int64_t LOG_FILE_SIZE = 16 * 1024; // 16KB const int64_t CONCURRENT_TRANS_CNT = 8; const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB @@ -545,14 +546,14 @@ TEST_F(TestStorageLogReaderWriter, switch_file_revise) int64_t revise_size = 0; ret = FileDirectoryUtils::get_file_size("./test_storage_log_rw/1", revise_size); ASSERT_EQ(OB_SUCCESS, ret); - ASSERT_EQ(2048, revise_size); + ASSERT_EQ(3 * 4096, revise_size); // truncate last 4k } TEST_F(TestStorageLogReaderWriter, errsim_io_hung) { int ret = OB_SUCCESS; const char LOG_DIR[512] = "./test_storage_log_rw"; - const int64_t LOG_FILE_SIZE = 2048; // 2KB + const int64_t LOG_FILE_SIZE = 16 * 1024; // 16KB const int64_t CONCURRENT_TRANS_CNT = 8; const int64_t LOG_BUFFER_SIZE = 1966080L; // 1.875MB