From f6083f5351ca0a8309584516e08148d8ecd2bbb9 Mon Sep 17 00:00:00 2001 From: z404289981 Date: Thu, 23 Nov 2023 02:41:03 +0000 Subject: [PATCH] opt whole scnner io buf --- .../access/ob_sstable_row_whole_scanner.cpp | 31 ++- .../access/ob_sstable_row_whole_scanner.h | 11 +- .../blocksstable/ob_imicro_block_writer.cpp | 221 +----------------- .../blocksstable/ob_imicro_block_writer.h | 73 +----- .../blocksstable/ob_micro_block_writer.cpp | 2 +- .../ob_compaction_memory_context.cpp | 207 ++++++++++++++++ .../compaction/ob_compaction_memory_context.h | 90 +++++++ .../compaction/ob_partition_merge_iter.cpp | 8 +- 8 files changed, 332 insertions(+), 311 deletions(-) diff --git a/src/storage/access/ob_sstable_row_whole_scanner.cpp b/src/storage/access/ob_sstable_row_whole_scanner.cpp index 4e3a49adab..b22b100c2e 100644 --- a/src/storage/access/ob_sstable_row_whole_scanner.cpp +++ b/src/storage/access/ob_sstable_row_whole_scanner.cpp @@ -42,17 +42,16 @@ ObSSTableRowWholeScanner::~ObSSTableRowWholeScanner() } } -int ObSSTableRowWholeScanner::alloc_io_buf() +int ObSSTableRowWholeScanner::alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size) { int ret = OB_SUCCESS; int64_t size = common::OB_DEFAULT_MACRO_BLOCK_SIZE * PREFETCH_DEPTH; - if (OB_ISNULL(buf_ = reinterpret_cast(io_allocator_.alloc(size)))) { //continuous memory - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to alloc ObSSTableRowWholeScanner read info buffer", K(ret), K(size)); - } else { - for (int64_t i = 0; OB_SUCC(ret) && i < PREFETCH_DEPTH; ++i) { - io_buf_[i] = buf_ + common::OB_DEFAULT_MACRO_BLOCK_SIZE * i; + if (OB_LIKELY(io_buf.is_inited())) { + if (OB_FAIL(io_buf.reserve(buf_size))) { + LOG_WARN("fail to reserve io buf", K(ret), K(io_buf), K(buf_size)); } + } else if (OB_FAIL(io_buf.init(common::OB_DEFAULT_MACRO_BLOCK_SIZE, buf_size))) { + LOG_WARN("fail to init io buf", K(ret), K(io_buf), K(buf_size)); } return ret; } @@ -77,8 +76,6 @@ void ObSSTableRowWholeScanner::reset() micro_scanner_ = nullptr; } allocator_.reset(); - io_allocator_.reset(); - buf_ = nullptr; is_inited_ = false; last_micro_block_recycled_ = false; last_mvcc_row_already_output_ = false; @@ -86,7 +83,7 @@ void ObSSTableRowWholeScanner::reset() void ObSSTableRowWholeScanner::reuse() { - ObStoreRowIterator::reuse(); + ObStoreRowIterator::reset(); iter_param_ = nullptr; access_ctx_ = nullptr; sstable_ = nullptr; @@ -104,8 +101,6 @@ void ObSSTableRowWholeScanner::reuse() micro_scanner_ = nullptr; } allocator_.reuse(); - io_allocator_.reuse(); - buf_ = nullptr; is_inited_ = false; last_micro_block_recycled_ = false; last_mvcc_row_already_output_ = false; @@ -208,8 +203,6 @@ int ObSSTableRowWholeScanner::inner_open( rowkey_read_info = iter_param.read_info_; } if (OB_FAIL(ret)) { - } else if (OB_FAIL(alloc_io_buf())) { - LOG_WARN("alloc io buffers failed", K(ret)); } else if (OB_FAIL(init_micro_scanner(range))) { LOG_WARN("Failed to init micro scanner", K(ret)); } else if (OB_FAIL(macro_block_iter_.open( @@ -272,8 +265,8 @@ int ObSSTableRowWholeScanner::open( MacroScanHandle &scan_handle = scan_handles_[0]; scan_handle.reset(); - if (OB_FAIL(alloc_io_buf())) { - LOG_WARN("alloc io buffers failed", K(ret)); + if (OB_FAIL(alloc_io_buf(io_buf_[0], sstable_->get_macro_read_size()))) { + LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size())); } else if (OB_FAIL(init_micro_scanner(&query_range))) { LOG_WARN("Fail to init micro scanner", K(ret)); } else { @@ -287,7 +280,7 @@ int ObSSTableRowWholeScanner::open( read_info.size_ = sstable_->get_macro_read_size(); read_info.io_desc_.set_wait_event(ObWaitEventIds::DB_FILE_COMPACT_READ); read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO); - read_info.buf_ = io_buf_[0]; + read_info.buf_ = io_buf_[0].data(); if (OB_FAIL(ret)) { } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) { @@ -424,6 +417,8 @@ int ObSSTableRowWholeScanner::prefetch() } else { LOG_WARN("Fail to get_next_macro_block ", K(ret), K(macro_block_iter_)); } + } else if (OB_FAIL(alloc_io_buf(io_buf_[io_index], sstable_->get_macro_read_size()))) { + LOG_WARN("alloc io buffers failed", K(ret), K(sstable_->get_macro_read_size())); } else { scan_handle.is_left_border_ = (0 == prefetch_macro_cursor_); scan_handle.is_right_border_ = false; // set right border correctly when open macro block @@ -433,7 +428,7 @@ int ObSSTableRowWholeScanner::prefetch() read_info.size_ = sstable_->get_macro_read_size(); read_info.io_desc_.set_wait_event(common::ObWaitEventIds::DB_FILE_COMPACT_READ); read_info.io_desc_.set_group_id(ObIOModule::SSTABLE_WHOLE_SCANNER_IO); - read_info.buf_ = io_buf_[io_index]; + read_info.buf_ = io_buf_[io_index].data(); if (OB_FAIL(ret)) { } else if (OB_FAIL(ObBlockManager::async_read_block(read_info, scan_handle.macro_io_handle_))) { LOG_WARN("Fail to read macro block, ", K(ret), K(read_info)); diff --git a/src/storage/access/ob_sstable_row_whole_scanner.h b/src/storage/access/ob_sstable_row_whole_scanner.h index 80c3446d1c..4bd0c69e5e 100644 --- a/src/storage/access/ob_sstable_row_whole_scanner.h +++ b/src/storage/access/ob_sstable_row_whole_scanner.h @@ -56,7 +56,6 @@ public: access_ctx_(nullptr), sstable_(nullptr), allocator_(common::ObModIds::OB_SSTABLE_READER, OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), - io_allocator_("SSTRWS_IOUB", OB_MALLOC_NORMAL_BLOCK_SIZE, MTL_ID()), prefetch_macro_cursor_(0), cur_macro_cursor_(0), is_macro_prefetch_end_(false), @@ -68,12 +67,13 @@ public: last_micro_block_recycled_(false), last_mvcc_row_already_output_(false), iter_macro_cnt_(0), - buf_(nullptr) + io_buf_() {} virtual ~ObSSTableRowWholeScanner(); - int alloc_io_buf(); + int alloc_io_buf(compaction::ObCompactionBuffer &io_buf, int64_t buf_size); virtual void reset() override; + virtual void reuse() override; int open( const ObTableIterParam &iter_param, ObTableAccessContext &access_ctx, @@ -97,7 +97,6 @@ protected: ObITable *table, const void *query_range) override; virtual int inner_get_next_row(const blocksstable::ObDatumRow *&row) override; - virtual void reuse() override; private: int init_micro_scanner(const blocksstable::ObDatumRange *range); int open_macro_block(); @@ -126,7 +125,6 @@ private: blocksstable::ObSSTable *sstable_; blocksstable::ObDatumRange query_range_; common::ObArenaAllocator allocator_; - common::ObArenaAllocator io_allocator_; int64_t prefetch_macro_cursor_; int64_t cur_macro_cursor_; bool is_macro_prefetch_end_; @@ -140,8 +138,7 @@ private: bool last_micro_block_recycled_; bool last_mvcc_row_already_output_; int64_t iter_macro_cnt_; - char *buf_; - char *io_buf_[PREFETCH_DEPTH]; + compaction::ObCompactionBuffer io_buf_[PREFETCH_DEPTH]; }; } diff --git a/src/storage/blocksstable/ob_imicro_block_writer.cpp b/src/storage/blocksstable/ob_imicro_block_writer.cpp index 2c8faa0f23..45bba43d94 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.cpp +++ b/src/storage/blocksstable/ob_imicro_block_writer.cpp @@ -56,237 +56,34 @@ void ObMicroBlockDesc::reset() /** * -------------------------------------------------------------------ObMicroBufferWriter------------------------------------------------------------------- */ -int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(is_inited_)) { - ret = OB_INIT_TWICE; - STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_)); - } else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE - || capacity < reserve_size)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size)); - } else { - capacity_ = capacity; - len_ = 0; - data_= nullptr; - buffer_size_ = 0; - reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD; - memory_reclaim_cnt_ = 0; - } - - if (OB_SUCC(ret)) { - if(OB_FAIL(reserve(reserve_size))) { - STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size)); - } else { - default_reserve_ = reserve_size; - is_inited_ = true; - } - } - - return ret; -} - -void ObMicroBufferWriter::reset() -{ - if (data_ != nullptr) { - allocator_.free(data_); - data_ = nullptr; - } - has_expand_ = false; - memory_reclaim_cnt_ = 0; - reset_memory_threshold_ = 0; - default_reserve_ = 0; - len_ = 0; - buffer_size_ = 0; - capacity_ = 0; - is_inited_ = false; - allocator_.reset(); -} - -void ObMicroBufferWriter::reuse() -{ - if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) { - memory_reclaim_cnt_++; - if (memory_reclaim_cnt_ >= reset_memory_threshold_) { - reset_memory_threshold_ <<= 1; - memory_reclaim_cnt_ = 0; - void *buf = nullptr; - if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) { - int ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_)); - } else { - allocator_.free(data_); - buffer_size_ = default_reserve_; - data_ = reinterpret_cast(buf); - } - } - } else { - memory_reclaim_cnt_ = 0; - } - has_expand_ = false; - len_ = 0; -} - -int ObMicroBufferWriter::expand(const int64_t size) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_)); - } else { - int64_t expand_size = buffer_size_ * 2; - while (expand_size < size) { - expand_size <<= 1; - } - expand_size = MIN(expand_size, capacity_); - if (OB_FAIL(reserve(expand_size))) { - STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size)); - } - } - - return ret; -} - -int ObMicroBufferWriter::reserve(const int64_t size) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(size < 0 || size > capacity_)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_)); - } else if (size <= buffer_size_) {//do nothing - } else { - void* buf = nullptr; - const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE); - if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) { - ret = OB_ALLOCATE_MEMORY_FAILED; - STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size)); - } else if (data_ != nullptr) { - has_expand_ = true; - MEMCPY(buf, data_, len_); - allocator_.free(data_); - data_ = nullptr; - } - if (OB_SUCC(ret)) { - data_ = reinterpret_cast(buf); - buffer_size_ = alloc_size; - } - } - - return ret; -} - -int ObMicroBufferWriter::ensure_space(const int64_t append_size) -{ - int ret = OB_SUCCESS; - - if (len_ + append_size > capacity_) { - ret = OB_BUF_NOT_ENOUGH; - } else if (len_ + append_size > buffer_size_) { - if (OB_FAIL(expand(len_ + append_size))) { - STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size)); - } - } - - return ret; -} - -int ObMicroBufferWriter::write_nop(const int64_t size, bool is_zero) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(size < 0)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_)); - } else if (OB_FAIL(ensure_space(size))) { - if (ret != OB_BUF_NOT_ENOUGH) { - STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size)); - } - } else { - if (is_zero) { - MEMSET(data_ + len_, 0, size); - } - len_ += size; - } - - return ret; -} - -int ObMicroBufferWriter::write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size) +int ObMicroBufferWriter::write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &size) { int ret = OB_SUCCESS; ObRowWriter row_writer; - if ((buffer_size_ == len_) && OB_FAIL(expand(buffer_size_))) { - STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_)); + if (remain_buffer_size() <= 0 && OB_FAIL(expand(ObCompactionBuffer::size()))) { + STORAGE_LOG(WARN, "failed to reserve", K(ret)); } while (OB_SUCC(ret)) { - if (OB_SUCC(row_writer.write(rowkey_cnt, row, data_ + len_, buffer_size_ - len_, size))) { + if (OB_SUCC(row_writer.write(rowkey_cnt, row, current(), remain_buffer_size(), size))) { break; } else { if (OB_UNLIKELY(ret != OB_BUF_NOT_ENOUGH)) { - STORAGE_LOG(WARN, "failed to write row", K(ret), K(buffer_size_), K(capacity_)); - } else if (buffer_size_ >= capacity_) { //break - } else if (OB_FAIL(expand(buffer_size_))) { - STORAGE_LOG(WARN, "failed to reserve", K(ret), K(buffer_size_)); + STORAGE_LOG(WARN, "failed to write row", K(ret), KPC(this)); + } else if (!check_could_expand()) { //break + } else if (OB_FAIL(expand(ObCompactionBuffer::size()))) { + STORAGE_LOG(WARN, "failed to reserve", K(ret)); } } } if (OB_SUCC(ret)) { - len_ += size; + write_nop(size); } return ret; } -int ObMicroBufferWriter::write(const void *buf, int64_t size) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(buf == nullptr || size < 0)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_)); - } else if (OB_FAIL(ensure_space(size))) { - if (ret != OB_BUF_NOT_ENOUGH) { - STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size)); - } - } else { - MEMCPY(data_ + len_, buf, size); - len_ += size; - } - - return ret; -} - -int ObMicroBufferWriter::advance(const int64_t size) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_)); - } else { - len_ += size; - } - return ret; -} - -int ObMicroBufferWriter::set_length(const int64_t len) -{ - int ret = OB_SUCCESS; - - if (OB_UNLIKELY(len > buffer_size_)) { - ret = OB_INVALID_ARGUMENT; - STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_)); - } else { - len_ = len; - } - return ret; -} /** * -------------------------------------------------------------------ObIMicroBlockWriter------------------------------------------------------------------- */ diff --git a/src/storage/blocksstable/ob_imicro_block_writer.h b/src/storage/blocksstable/ob_imicro_block_writer.h index 9598b4a8a8..59c8224e0c 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.h +++ b/src/storage/blocksstable/ob_imicro_block_writer.h @@ -87,79 +87,14 @@ enum MICRO_BLOCK_MERGE_VERIFY_LEVEL ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE = 3, }; -class ObMicroBufferWriter final +class ObMicroBufferWriter : public compaction::ObCompactionBuffer { public: ObMicroBufferWriter(const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE) - : allocator_(MTL_ID(), "MicroBuffer"), - is_inited_(false), - capacity_(0), - buffer_size_(0), - len_(0), - data_(nullptr), - reset_memory_threshold_(0), - memory_reclaim_cnt_(0), - has_expand_(false) + : ObCompactionBuffer("MicroBuffer", page_size) {} - ~ObMicroBufferWriter() { reset(); }; - int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE); - inline bool is_inited() const { return is_inited_; } - inline int64_t remain() const { return capacity_ - len_; } - inline int64_t remain_buffer_size() const { return buffer_size_ - len_; } - inline int64_t size() const { return buffer_size_; } //curr buffer size - inline bool has_expand() const { return has_expand_; } - inline char *data() { return data_; } - inline char *current() { return data_ + len_; } - int reserve(const int64_t size); - int ensure_space(const int64_t append_size); - inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); } - int write_nop(const int64_t size, bool is_zero = false); - int write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len); - int write(const void *buf, int64_t size); - template - int write(const T &value) - { - int ret = OB_SUCCESS; - static_assert(std::is_trivially_copyable::value, "invalid type"); - if (OB_FAIL(ensure_space(sizeof(T)))) { - if (ret != OB_BUF_NOT_ENOUGH) { - STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T))); - } - } else { - *((T *)(data_ + len_)) = value; - len_ += sizeof(T); - } - return ret; - } - int advance(const int64_t size); - int set_length(const int64_t len); - - void reuse(); - void reset(); - inline int64_t length() const { return len_; } - TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold), - K_(memory_reclaim_cnt), K_(has_expand)); -private: - int expand(const int64_t size); -private: - compaction::ObLocalAllocator allocator_; - bool is_inited_; - int64_t capacity_; - int64_t buffer_size_; //curr buffer size - int64_t len_; //curr pos - char *data_; - - // for reclaim memory - int64_t default_reserve_; - int64_t reset_memory_threshold_; - int64_t memory_reclaim_cnt_; - bool has_expand_; - -private: - static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb - static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m - static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K - static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5; + virtual ~ObMicroBufferWriter() { reset(); }; + int write_row(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len); }; // Some common interface of ObMicroBlockWriter and ObMicroBlockEncoder, not all features. diff --git a/src/storage/blocksstable/ob_micro_block_writer.cpp b/src/storage/blocksstable/ob_micro_block_writer.cpp index f7c04317e7..bea3ccd447 100644 --- a/src/storage/blocksstable/ob_micro_block_writer.cpp +++ b/src/storage/blocksstable/ob_micro_block_writer.cpp @@ -173,7 +173,7 @@ int ObMicroBlockWriter::append_row(const ObDatumRow &row) ret = OB_INVALID_ARGUMENT; STORAGE_LOG(WARN, "append row column count is not consistent with init column count", K(get_header(data_buffer_)->column_count_), K(row.get_column_count()), K(ret)); - } else if (OB_FAIL(data_buffer_.write(row, rowkey_column_count_, pos))) { + } else if (OB_FAIL(data_buffer_.write_row(row, rowkey_column_count_, pos))) { if (OB_BUF_NOT_ENOUGH != ret) { STORAGE_LOG(WARN, "row writer fail to write row.", K(ret), K(rowkey_column_count_), K(row), K(OB_P(data_buffer_.remain())), K(pos)); diff --git a/src/storage/compaction/ob_compaction_memory_context.cpp b/src/storage/compaction/ob_compaction_memory_context.cpp index 6777382242..e1b90de192 100644 --- a/src/storage/compaction/ob_compaction_memory_context.cpp +++ b/src/storage/compaction/ob_compaction_memory_context.cpp @@ -162,6 +162,213 @@ void ObCompactionMemoryContext::mem_click() mem_peak_total_ = MAX(mem_peak_total_, mem_total); } + /** + * -------------------------------------------------------------------ObCompactionBuffer------------------------------------------------------------------- + */ +int ObCompactionBuffer::init(const int64_t capacity, const int64_t reserve_size) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(is_inited_)) { + ret = OB_INIT_TWICE; + STORAGE_LOG(WARN, "micro buffer writer is inited", K(ret), K(capacity_)); + } else if (OB_UNLIKELY(reserve_size < 0 || capacity > MAX_DATA_BUFFER_SIZE + || capacity < reserve_size)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(capacity), K(reserve_size)); + } else { + capacity_ = capacity; + len_ = 0; + data_= nullptr; + buffer_size_ = 0; + reset_memory_threshold_ = DEFAULT_RESET_MEMORY_THRESHOLD; + memory_reclaim_cnt_ = 0; + } + + if (OB_SUCC(ret)) { + if(OB_FAIL(reserve(reserve_size))) { + STORAGE_LOG(WARN, "failed to reserve", K(ret), K(reserve_size)); + } else { + default_reserve_ = reserve_size; + is_inited_ = true; + } + } + + return ret; +} + +void ObCompactionBuffer::reset() +{ + if (data_ != nullptr) { + allocator_.free(data_); + data_ = nullptr; + } + has_expand_ = false; + memory_reclaim_cnt_ = 0; + reset_memory_threshold_ = 0; + default_reserve_ = 0; + len_ = 0; + buffer_size_ = 0; + capacity_ = 0; + is_inited_ = false; + allocator_.reset(); +} + +void ObCompactionBuffer::reuse() +{ + if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) { + memory_reclaim_cnt_++; + if (memory_reclaim_cnt_ >= reset_memory_threshold_) { + reset_memory_threshold_ <<= 1; + memory_reclaim_cnt_ = 0; + void *buf = nullptr; + if (OB_ISNULL(buf = allocator_.alloc(default_reserve_))) { + int ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "failed to reclaim memory", K(ret), K(default_reserve_)); + } else { + allocator_.free(data_); + buffer_size_ = default_reserve_; + data_ = reinterpret_cast(buf); + } + } + } else { + memory_reclaim_cnt_ = 0; + } + has_expand_ = false; + len_ = 0; +} + +int ObCompactionBuffer::expand(const int64_t size) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(capacity_ <= buffer_size_ || size > capacity_)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(buffer_size_), K(capacity_)); + } else { + int64_t expand_size = buffer_size_ * 2; + while (expand_size < size) { + expand_size <<= 1; + } + expand_size = MIN(expand_size, capacity_); + if (OB_FAIL(reserve(expand_size))) { + STORAGE_LOG(WARN, "fail to reserve", K(ret), K(expand_size)); + } + } + + return ret; +} + +int ObCompactionBuffer::reserve(const int64_t size) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(size < 0 || size > capacity_)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(capacity_)); + } else if (size <= buffer_size_) {//do nothing + } else { + void* buf = nullptr; + const int64_t alloc_size = MAX(size, MIN_BUFFER_SIZE); + if (OB_ISNULL(buf = allocator_.alloc(alloc_size))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size)); + } else if (data_ != nullptr) { + has_expand_ = true; + MEMCPY(buf, data_, len_); + allocator_.free(data_); + data_ = nullptr; + } + if (OB_SUCC(ret)) { + data_ = reinterpret_cast(buf); + buffer_size_ = alloc_size; + } + } + + return ret; +} + +int ObCompactionBuffer::ensure_space(const int64_t append_size) +{ + int ret = OB_SUCCESS; + + if (len_ + append_size > capacity_) { + ret = OB_BUF_NOT_ENOUGH; + } else if (len_ + append_size > buffer_size_) { + if (OB_FAIL(expand(len_ + append_size))) { + STORAGE_LOG(WARN, "failed to expand size", K(ret), K(len_), K(append_size)); + } + } + + return ret; +} + +int ObCompactionBuffer::write_nop(const int64_t size, bool is_zero) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(size < 0)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(capacity_)); + } else if (OB_FAIL(ensure_space(size))) { + if (ret != OB_BUF_NOT_ENOUGH) { + STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size)); + } + } else { + if (is_zero) { + MEMSET(data_ + len_, 0, size); + } + len_ += size; + } + + return ret; +} + +int ObCompactionBuffer::write(const void *buf, int64_t size) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(buf == nullptr || size < 0)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(buf), K(size), K(len_), K(capacity_)); + } else if (OB_FAIL(ensure_space(size))) { + if (ret != OB_BUF_NOT_ENOUGH) { + STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(size)); + } + } else { + MEMCPY(data_ + len_, buf, size); + len_ += size; + } + + return ret; +} + +int ObCompactionBuffer::advance(const int64_t size) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(size < 0 || len_ + size > buffer_size_)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(size), K(len_), K(buffer_size_)); + } else { + len_ += size; + } + return ret; +} + +int ObCompactionBuffer::set_length(const int64_t len) +{ + int ret = OB_SUCCESS; + + if (OB_UNLIKELY(len > buffer_size_)) { + ret = OB_INVALID_ARGUMENT; + STORAGE_LOG(WARN, "invalid argument", K(ret), K(len), K(len_), K(buffer_size_)); + } else { + len_ = len; + } + return ret; +} + } //compaction } //oceanbase diff --git a/src/storage/compaction/ob_compaction_memory_context.h b/src/storage/compaction/ob_compaction_memory_context.h index 2da823e45d..91f136eb02 100644 --- a/src/storage/compaction/ob_compaction_memory_context.h +++ b/src/storage/compaction/ob_compaction_memory_context.h @@ -264,6 +264,96 @@ private: common::ObSpinLock lock_; }; +class ObCompactionBuffer +{ +public: + ObCompactionBuffer(const lib::ObLabel &label = "compaction_buf", const int64_t page_size = DEFAULT_MIDDLE_BLOCK_SIZE) + : allocator_(MTL_ID(), label), + is_inited_(false), + capacity_(0), + buffer_size_(0), + len_(0), + data_(nullptr), + reset_memory_threshold_(0), + memory_reclaim_cnt_(0), + has_expand_(false) + {} + virtual ~ObCompactionBuffer() { reset(); }; + int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE); + inline bool is_inited() const { return is_inited_; } + inline int64_t remain() const { return capacity_ - len_; } + inline int64_t remain_buffer_size() const { return buffer_size_ - len_; } + inline int64_t size() const { return buffer_size_; } //curr buffer size + inline bool has_expand() const { return has_expand_; } + inline char *data() { return data_; } + inline char *current() { return data_ + len_; } + int reserve(const int64_t size); + int ensure_space(const int64_t append_size); + inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); } + int write_nop(const int64_t size, bool is_zero = false); + int write(const void *buf, int64_t size); + + template + typename std::enable_if::type write(const T &value) + { + int ret = OB_SUCCESS; + static_assert(std::is_pod::value, "invalid type"); + if (OB_FAIL(ensure_space(sizeof(T)))) { + if (ret != OB_BUF_NOT_ENOUGH) { + STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(sizeof(T))); + } + } else { + *((T *)(data_ + len_)) = value; + len_ += sizeof(T); + } + return ret; + } + + template + typename std::enable_if::type write(const T &value) + { + int ret = OB_SUCCESS; + if (OB_FAIL(ensure_space(value.get_serialize_size()))) { + if (ret != OB_BUF_NOT_ENOUGH) { + STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(value.get_serialize_size())); + } + } else if (OB_FAIL(value.serialize(data_, buffer_size_, len_))) { + STORAGE_LOG(WARN, "fail to serialize", K(ret), K(buffer_size_), K(len_)); + } + return ret; + } + int advance(const int64_t size); + int set_length(const int64_t len); + + void reuse(); + void reset(); + inline int64_t length() const { return len_; } + TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold), + K_(memory_reclaim_cnt), K_(has_expand)); +protected: + bool check_could_expand() { return capacity_ > buffer_size_; } + int expand(const int64_t size); +private: + compaction::ObLocalAllocator allocator_; + bool is_inited_; + int64_t capacity_; + int64_t buffer_size_; //curr buffer size + int64_t len_; //curr pos + char *data_; + + // for reclaim memory + int64_t default_reserve_; + int64_t reset_memory_threshold_; + int64_t memory_reclaim_cnt_; + bool has_expand_; + +protected: + static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb + static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m + static const int64_t DEFAULT_MIDDLE_BLOCK_SIZE = 1 << 16; //64K + static const int64_t DEFAULT_RESET_MEMORY_THRESHOLD = 5; +}; + } // compaction } // oceanbase diff --git a/src/storage/compaction/ob_partition_merge_iter.cpp b/src/storage/compaction/ob_partition_merge_iter.cpp index 4b551997da..1e5b73b095 100644 --- a/src/storage/compaction/ob_partition_merge_iter.cpp +++ b/src/storage/compaction/ob_partition_merge_iter.cpp @@ -559,7 +559,7 @@ int ObPartitionMacroMergeIter::open_curr_range(const bool for_rewrite, const boo LOG_WARN("fail to switch_query_range", K(ret)); } } else { - iter->reset(); + iter->reuse(); if (OB_FAIL(iter->open( access_param_.iter_param_, access_context_, @@ -712,7 +712,7 @@ int ObPartitionMacroMergeIter::exist(const ObDatumRow &row, bool &is_exist) LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_)); } } else { - iter->reset(); + iter->reuse(); if (OB_FAIL(iter->open( access_param_.iter_param_, access_context_, @@ -773,7 +773,7 @@ int ObPartitionMacroMergeIter::check_row_changed(const blocksstable::ObDatumRow LOG_WARN("fail to switch_query_range", K(ret), K(cs_datum_range_)); } } else { - iter->reset(); + iter->reuse(); if (OB_FAIL(iter->open( access_param_.iter_param_, access_context_, @@ -1811,7 +1811,7 @@ int ObPartitionMinorMacroMergeIter::open_curr_macro_block() LOG_WARN("Unepxcted opened macro block to open", K(ret)); } else { ObSSTableRowWholeScanner *iter = reinterpret_cast(row_iter_); - iter->reset(); + iter->reuse(); if (OB_FAIL(iter->open( access_param_.iter_param_, access_context_,