diff --git a/src/storage/blocksstable/encoding/ob_micro_block_encoder.cpp b/src/storage/blocksstable/encoding/ob_micro_block_encoder.cpp index 4d9181e798..2ff8648beb 100644 --- a/src/storage/blocksstable/encoding/ob_micro_block_encoder.cpp +++ b/src/storage/blocksstable/encoding/ob_micro_block_encoder.cpp @@ -93,6 +93,7 @@ int ObMicroBlockEncoder::try_encoder(ObIColumnEncoder *&encoder, const int64_t c #define MICRO_BLOCK_PAGE_ALLOCATOR ModulePageAllocator(common::ObModIds::OB_ENCODER_ALLOCATOR, MTL_ID()) ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL), + encoding_meta_allocator_(), data_buffer_(), datum_rows_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR), all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR), @@ -113,6 +114,7 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL), length_(0), is_inited_(false) { + encoding_meta_allocator_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder")); datum_rows_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder")); all_col_datums_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder")); encoders_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder")); @@ -207,6 +209,7 @@ void ObMicroBlockEncoder::reset() ObIMicroBlockWriter::reset(); is_inited_ = false; //ctx_ + encoding_meta_allocator_.reset(); data_buffer_.reset(); datum_rows_.reset(); FOREACH(cv, all_col_datums_) { @@ -242,6 +245,7 @@ void ObMicroBlockEncoder::reuse() ObIMicroBlockWriter::reuse(); // is_inited_ // ctx_ + encoding_meta_allocator_.reuse(); data_buffer_.reuse(); datum_rows_.reuse(); FOREACH(c, all_col_datums_) { @@ -497,7 +501,7 @@ int ObMicroBlockEncoder::reserve_header(const ObMicroBlockEncodingCtx &ctx) } return ret; } -int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta_offset) +int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(ObBufferWriter &buf_writer, int64_t &encoding_meta_offset) { int ret = OB_SUCCESS; if (IS_NOT_INIT) { @@ -529,37 +533,32 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta LOG_WARN("advance data buffer failed", K(ret), K(col_header_size), K(encoding_meta_offset)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); ++i) { - int64_t pos_bak = data_buffer_.length(); + int64_t pos_bak = buf_writer.length(); ObIColumnEncoder::EncoderDesc &desc = encoders_.at(i)->get_desc(); - ObBufferWriter buffer_writer(data_buffer_.data(), data_buffer_.size(), data_buffer_.length()); - if (OB_FAIL(encoders_.at(i)->store_meta(buffer_writer))) { + if (OB_FAIL(encoders_.at(i)->store_meta(buf_writer))) { LOG_WARN("store encoding meta failed", K(ret)); - } else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) { - STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_)); } else { ObColumnHeader &ch = encoders_.at(i)->get_column_header(); - if (data_buffer_.length() > pos_bak) { - ch.offset_ = static_cast(pos_bak - encoding_meta_offset); - ch.length_ = static_cast(data_buffer_.length() - pos_bak); + if (buf_writer.length() > pos_bak) { + ch.offset_ = static_cast(pos_bak); + ch.length_ = static_cast(buf_writer.length() - pos_bak); } else if (ObColumnHeader::RAW == encoders_.at(i)->get_type()) { // column header offset records the start pos of the fix data, if needed - ch.offset_ = static_cast(pos_bak - encoding_meta_offset); + ch.offset_ = static_cast(pos_bak); } ch.obj_type_ = static_cast(encoders_.at(i)->get_obj_type()); } if (OB_SUCC(ret) && !desc.is_var_data_ && desc.need_data_store_) { - if (OB_FAIL(encoders_.at(i)->store_fix_data(buffer_writer))) { + if (OB_FAIL(encoders_.at(i)->store_fix_data(buf_writer))) { LOG_WARN("failed to store fixed data", K(ret)); - } else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) { - STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_)); } } } } if (OB_SUCC(ret)) { - get_header(data_buffer_)->row_data_offset_ = static_cast(data_buffer_.length()); + get_header(data_buffer_)->row_data_offset_ = static_cast(encoding_meta_offset + buf_writer.length()); } } return ret; @@ -570,6 +569,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size) int ret = OB_SUCCESS; int64_t encoders_need_size = 0; const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader)); + char *encoding_meta_buf = nullptr; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); @@ -586,14 +586,23 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size) LOG_WARN("detect column encoding failed", K(ret)); } else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) { STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_)); + // encoder pointers depend on this memory during build_block(), and its life cycyle needs to be longer than build_block(). + } else if (OB_ISNULL(encoding_meta_buf = static_cast(encoding_meta_allocator_.alloc(encoders_need_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size)); } else { STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct), K(datum_rows_.count()), K(ctx_)); - // <1> store encoding metas and fix cols data + // <1> store encoding metas and fix cols data in encoding_meta_buffer int64_t encoding_meta_offset = 0; - if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) { + int64_t encoding_meta_size = 0; + ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0); + if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) { LOG_WARN("failed to store encoding meta and fixed col data", K(ret)); + } else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) { + } else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) { + STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_)); } // <2> set row data store offset @@ -639,7 +648,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size) } } - // <5> fill header + // <5> fill header, encoding_meta and fix cols data if (OB_SUCC(ret)) { get_header(data_buffer_)->row_count_ = static_cast(datum_rows_.count()); get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_; @@ -651,6 +660,8 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size) MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader)); data += sizeof(ObColumnHeader); } + // fill encoding meta and fix cols data + MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size); } if (OB_SUCC(ret)) { @@ -815,8 +826,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size) LOG_WARN("reserve array failed", K(ret), "count", var_data_encoders_.count()); } else if (OB_FAIL(row_indexs_.push_back(0))) { LOG_WARN("add row index failed", K(ret)); - } else if (OB_FAIL(data_buffer_.set_lazy_move_cur_buf())) { - STORAGE_LOG(WARN, "fail to set lazy move", K(ret), K(data_buffer_)); } else { const int64_t row_data_offset = get_header(data_buffer_)->row_data_offset_; for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) { @@ -900,9 +909,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size) } } } - if (OB_SUCC(ret)) { - data_buffer_.move_buf(); - } } return ret; diff --git a/src/storage/blocksstable/encoding/ob_micro_block_encoder.h b/src/storage/blocksstable/encoding/ob_micro_block_encoder.h index d9033432a8..ed6c6d502b 100644 --- a/src/storage/blocksstable/encoding/ob_micro_block_encoder.h +++ b/src/storage/blocksstable/encoding/ob_micro_block_encoder.h @@ -143,7 +143,7 @@ private: void update_estimate_size_limit(const ObMicroBlockEncodingCtx &ctx); - int store_encoding_meta_and_fix_cols(int64_t &encoding_meta_offset); + int store_encoding_meta_and_fix_cols(ObBufferWriter &buf_writer, int64_t &encoding_meta_offset); int init_all_col_values(const ObMicroBlockEncodingCtx &ctx); void print_micro_block_encoder_status() const; int set_datum_rows_ptr(); @@ -151,6 +151,7 @@ private: private: ObMicroBlockEncodingCtx ctx_; ObMicroBlockHeader *header_; + ObArenaAllocator encoding_meta_allocator_; ObMicroBufferWriter data_buffer_; ObConstDatumRowArray datum_rows_; common::ObArray all_col_datums_; diff --git a/src/storage/blocksstable/ob_imicro_block_writer.cpp b/src/storage/blocksstable/ob_imicro_block_writer.cpp index 9ecdd4d5a9..2c8faa0f23 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.cpp +++ b/src/storage/blocksstable/ob_imicro_block_writer.cpp @@ -90,16 +90,10 @@ int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size void ObMicroBufferWriter::reset() { - if (old_buf_ != nullptr) { - allocator_.free(old_buf_); - old_buf_ = nullptr; - } if (data_ != nullptr) { allocator_.free(data_); data_ = nullptr; } - old_size_ = 0; - lazy_move_ = false; has_expand_ = false; memory_reclaim_cnt_ = 0; reset_memory_threshold_ = 0; @@ -113,11 +107,6 @@ void ObMicroBufferWriter::reset() void ObMicroBufferWriter::reuse() { - if (old_buf_ != nullptr) { - int ret = OB_ERR_SYS; - STORAGE_LOG(ERROR, "unexcpected old buf", K(ret), K(*this)); - abort(); - } if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) { memory_reclaim_cnt_++; if (memory_reclaim_cnt_ >= reset_memory_threshold_) { @@ -136,8 +125,6 @@ void ObMicroBufferWriter::reuse() } else { memory_reclaim_cnt_ = 0; } - old_size_ = 0; - lazy_move_ = false; has_expand_ = false; len_ = 0; } @@ -179,14 +166,8 @@ int ObMicroBufferWriter::reserve(const int64_t size) STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size)); } else if (data_ != nullptr) { has_expand_ = true; - if (lazy_move_) { - lazy_move_ = false; - old_buf_ = data_; - old_size_ = len_; - } else { - MEMCPY(buf, data_, len_); - allocator_.free(data_); - } + MEMCPY(buf, data_, len_); + allocator_.free(data_); data_ = nullptr; } if (OB_SUCC(ret)) { diff --git a/src/storage/blocksstable/ob_imicro_block_writer.h b/src/storage/blocksstable/ob_imicro_block_writer.h index b180114e82..9598b4a8a8 100644 --- a/src/storage/blocksstable/ob_imicro_block_writer.h +++ b/src/storage/blocksstable/ob_imicro_block_writer.h @@ -99,10 +99,7 @@ public: data_(nullptr), reset_memory_threshold_(0), memory_reclaim_cnt_(0), - has_expand_(false), - lazy_move_(false), - old_buf_(nullptr), - old_size_(0) + has_expand_(false) {} ~ObMicroBufferWriter() { reset(); }; int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE); @@ -111,32 +108,10 @@ public: inline int64_t remain_buffer_size() const { return buffer_size_ - len_; } inline int64_t size() const { return buffer_size_; } //curr buffer size inline bool has_expand() const { return has_expand_; } - inline char *data() { assert(old_buf_ == nullptr); return data_; } + inline char *data() { return data_; } inline char *current() { return data_ + len_; } int reserve(const int64_t size); int ensure_space(const int64_t append_size); - // don't use it, only for encoding - int set_lazy_move_cur_buf() - { - int ret = OB_SUCCESS; - if (OB_UNLIKELY(old_buf_ != nullptr)) { - ret = OB_ERR_UNEXPECTED; - STORAGE_LOG(WARN, "unexpected old buf", K(ret)); - } else { - lazy_move_ = true; - } - return ret; - } - void move_buf() - { - lazy_move_ = false; - if (old_buf_ != nullptr) { - MEMCPY(data_, old_buf_, old_size_); - allocator_.free(old_buf_); - old_buf_ = nullptr; - old_size_ = 0; - } - } inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); } int write_nop(const int64_t size, bool is_zero = false); int write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len); @@ -163,7 +138,7 @@ public: void reset(); inline int64_t length() const { return len_; } TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold), - K_(memory_reclaim_cnt), K_(has_expand), K_(lazy_move), K_(old_buf), K_(old_size)); + K_(memory_reclaim_cnt), K_(has_expand)); private: int expand(const int64_t size); private: @@ -180,10 +155,6 @@ private: int64_t memory_reclaim_cnt_; bool has_expand_; - bool lazy_move_; - char *old_buf_; - int64_t old_size_; - private: static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m diff --git a/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h b/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h index 01d91dba6a..03fd928d69 100644 --- a/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h +++ b/unittest/storage/blocksstable/cs_encoding/test_decoder_filter_perf.h @@ -305,11 +305,13 @@ public: int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) { int ret = OB_SUCCESS; - int64_t need_size; - if (!is_inited_) { + int64_t encoders_need_size = 0; + const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader)); + char *encoding_meta_buf = nullptr; + if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (datum_rows_.empty()) { + } else if (OB_UNLIKELY(datum_rows_.empty())) { ret = OB_INNER_STAT_ERROR; LOG_WARN("empty micro block", K(ret)); } else if (OB_FAIL(set_datum_rows_ptr())) { @@ -318,13 +320,11 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) LOG_WARN("pivot rows to columns failed", K(ret)); } else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) { LOG_WARN("array reserve failed", K(ret), "count", datum_rows_.count()); - } else if (OB_FAIL(encoder_detection(need_size))) { + } else if (OB_FAIL(encoder_detection(encoders_need_size))) { LOG_WARN("detect column encoding failed", K(ret)); - } else if (OB_FAIL(data_buffer_.ensure_space(1<<20))) { - STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_)); } else { - - for (int64_t i = 0; i < ctx_.column_cnt_; ++i) { + encoders_need_size = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < ctx_.column_cnt_; ++i) { const bool force_var_store = false; if (NULL != encoders_[i]) { free_encoder(encoders_[i]); @@ -341,12 +341,36 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) encoders_[i] = e; } } + for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); i++) { + int64_t need_size = 0; + if (OB_FAIL(encoders_.at(i)->get_encoding_store_meta_need_space(need_size))) { + STORAGE_LOG(WARN, "fail to get_encoding_store_meta_need_space", K(ret), K(i), K(encoders_)); + } else { + need_size += encoders_.at(i)->calc_encoding_fix_data_need_space(); + encoders_need_size += need_size; + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) { + STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_)); + } else if (OB_ISNULL(encoding_meta_buf = static_cast(encoding_meta_allocator_.alloc(encoders_need_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size)); + } else { + STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct), + K(datum_rows_.count()), K(ctx_)); - // <1> store encoding metas and fix cols data + // <1> store encoding metas and fix cols data in encoding_meta_buffer int64_t encoding_meta_offset = 0; - if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) { + int64_t encoding_meta_size = 0; + ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0); + if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) { LOG_WARN("failed to store encoding meta and fixed col data", K(ret)); + } else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) { + } else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) { + STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_)); } // <2> set row data store offset @@ -355,7 +379,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) if (OB_FAIL(set_row_data_pos(fix_data_size))) { LOG_WARN("set row data position failed", K(ret)); } else { - get_header(data_buffer_)->var_column_count_ = static_cast(var_data_encoders_.count()); + get_header(data_buffer_)->var_column_count_ = static_cast(var_data_encoders_.count()); } } @@ -377,10 +401,12 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } ObIntegerArrayGenerator gen; const int64_t row_index_size = row_indexs_.count() * get_header(data_buffer_)->row_index_byte_; - if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) { + if (OB_FAIL(data_buffer_.ensure_space(row_index_size))) { + STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(row_index_size), K(data_buffer_)); + } else if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) { LOG_WARN("init integer array generator failed", K(ret), "byte", get_header(data_buffer_)->row_index_byte_); - } else if (OB_FAIL(data_buffer_.write_nop(row_index_size, true))) { + } else if (OB_FAIL(data_buffer_.write_nop(row_index_size))) { LOG_WARN("advance data buffer failed", K(ret), K(row_index_size)); } else { for (int64_t idx = 0; idx < row_indexs_.count(); ++idx) { @@ -390,19 +416,20 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } } - // <5> fill header + // <5> fill header, encoding_meta and fix cols data if (OB_SUCC(ret)) { - get_header(data_buffer_)->row_count_ = static_cast(datum_rows_.count()); + get_header(data_buffer_)->row_count_ = static_cast(datum_rows_.count()); get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_; get_header(data_buffer_)->all_lob_in_row_ = !has_lob_out_row_; - - + get_header(data_buffer_)->max_merged_trans_version_ = max_merged_trans_version_; const int64_t header_size = get_header(data_buffer_)->header_size_; char *data = data_buffer_.data() + header_size; FOREACH(e, encoders_) { MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader)); data += sizeof(ObColumnHeader); } + // fill encoding meta and fix cols data + MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size); } if (OB_SUCC(ret)) { @@ -415,7 +442,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) ObIColumnEncoder *e = encoders_.at(idx); pe.type_ = static_cast(e->get_column_header().type_); if (ObColumnHeader::is_inter_column_encoder(pe.type_)) { - pe.ref_col_idx_ = static_cast(e)->get_ref_col_idx(); + pe.ref_col_idx_ = static_cast(e)->get_ref_col_idx(); } else { pe.ref_col_idx_ = 0; } diff --git a/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp b/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp index d63e9b695e..9dfa4f65b4 100644 --- a/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp +++ b/unittest/storage/blocksstable/encoding/test_raw_decoder.cpp @@ -53,11 +53,13 @@ public: int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) { int ret = OB_SUCCESS; - int64_t need_size = 0; - if (!is_inited_) { + int64_t encoders_need_size = 0; + const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader)); + char *encoding_meta_buf = nullptr; + if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("not init", K(ret)); - } else if (datum_rows_.empty()) { + } else if (OB_UNLIKELY(datum_rows_.empty())) { ret = OB_INNER_STAT_ERROR; LOG_WARN("empty micro block", K(ret)); } else if (OB_FAIL(set_datum_rows_ptr())) { @@ -66,11 +68,11 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) LOG_WARN("pivot rows to columns failed", K(ret)); } else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) { LOG_WARN("array reserve failed", K(ret), "count", datum_rows_.count()); - } else if (OB_FAIL(encoder_detection(need_size))) { + } else if (OB_FAIL(encoder_detection(encoders_need_size))) { LOG_WARN("detect column encoding failed", K(ret)); } else { - - for (int64_t i = 0; i < ctx_.column_cnt_; ++i) { + encoders_need_size = 0; + for (int64_t i = 0; OB_SUCC(ret) && i < ctx_.column_cnt_; ++i) { const bool force_var_store = false; if (NULL != encoders_[i]) { free_encoder(encoders_[i]); @@ -87,12 +89,36 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) encoders_[i] = e; } } + for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); i++) { + int64_t need_size = 0; + if (OB_FAIL(encoders_.at(i)->get_encoding_store_meta_need_space(need_size))) { + STORAGE_LOG(WARN, "fail to get_encoding_store_meta_need_space", K(ret), K(i), K(encoders_)); + } else { + need_size += encoders_.at(i)->calc_encoding_fix_data_need_space(); + encoders_need_size += need_size; + } + } + } + if (OB_FAIL(ret)) { + } else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) { + STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_)); + } else if (OB_ISNULL(encoding_meta_buf = static_cast(encoding_meta_allocator_.alloc(encoders_need_size)))) { + ret = OB_ALLOCATE_MEMORY_FAILED; + STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size)); + } else { + STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct), + K(datum_rows_.count()), K(ctx_)); - // <1> store encoding metas and fix cols data + // <1> store encoding metas and fix cols data in encoding_meta_buffer int64_t encoding_meta_offset = 0; - if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) { + int64_t encoding_meta_size = 0; + ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0); + if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) { LOG_WARN("failed to store encoding meta and fixed col data", K(ret)); + } else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) { + } else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) { + STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_)); } // <2> set row data store offset @@ -101,7 +127,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) if (OB_FAIL(set_row_data_pos(fix_data_size))) { LOG_WARN("set row data position failed", K(ret)); } else { - get_header(data_buffer_)->var_column_count_ = static_cast(var_data_encoders_.count()); + get_header(data_buffer_)->var_column_count_ = static_cast(var_data_encoders_.count()); } } @@ -123,10 +149,12 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } ObIntegerArrayGenerator gen; const int64_t row_index_size = row_indexs_.count() * get_header(data_buffer_)->row_index_byte_; - if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) { + if (OB_FAIL(data_buffer_.ensure_space(row_index_size))) { + STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(row_index_size), K(data_buffer_)); + } else if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) { LOG_WARN("init integer array generator failed", K(ret), "byte", get_header(data_buffer_)->row_index_byte_); - } else if (OB_FAIL(data_buffer_.write_nop(row_index_size, true))) { + } else if (OB_FAIL(data_buffer_.write_nop(row_index_size))) { LOG_WARN("advance data buffer failed", K(ret), K(row_index_size)); } else { for (int64_t idx = 0; idx < row_indexs_.count(); ++idx) { @@ -136,19 +164,20 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) } } - // <5> fill header + // <5> fill header, encoding_meta and fix cols data if (OB_SUCC(ret)) { - get_header(data_buffer_)->row_count_ = static_cast(datum_rows_.count()); + get_header(data_buffer_)->row_count_ = static_cast(datum_rows_.count()); get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_; get_header(data_buffer_)->all_lob_in_row_ = !has_lob_out_row_; - - + get_header(data_buffer_)->max_merged_trans_version_ = max_merged_trans_version_; const int64_t header_size = get_header(data_buffer_)->header_size_; char *data = data_buffer_.data() + header_size; FOREACH(e, encoders_) { MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader)); data += sizeof(ObColumnHeader); } + // fill encoding meta and fix cols data + MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size); } if (OB_SUCC(ret)) { @@ -161,7 +190,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size) ObIColumnEncoder *e = encoders_.at(idx); pe.type_ = static_cast(e->get_column_header().type_); if (ObColumnHeader::is_inter_column_encoder(pe.type_)) { - pe.ref_col_idx_ = static_cast(e)->get_ref_col_idx(); + pe.ref_col_idx_ = static_cast(e)->get_ref_col_idx(); } else { pe.ref_col_idx_ = 0; } @@ -214,6 +243,7 @@ public: decode_res_pool_ = new(allocator_.alloc(sizeof(ObDecodeResourcePool))) ObDecodeResourcePool; tenant_ctx_.set(decode_res_pool_); share::ObTenantEnv::set_tenant(&tenant_ctx_); + encoder_.encoding_meta_allocator_.set_tenant_id(OB_SERVER_TENANT_ID); encoder_.data_buffer_.allocator_.set_tenant_id(OB_SERVER_TENANT_ID); encoder_.row_buf_holder_.allocator_.set_tenant_id(OB_SERVER_TENANT_ID); decode_res_pool_->init();