[opt] micro buffer writer

This commit is contained in:
obdev
2024-02-06 19:17:42 +00:00
committed by ob-robot
parent ef9e453668
commit 618a949210
6 changed files with 126 additions and 110 deletions

View File

@ -93,6 +93,7 @@ int ObMicroBlockEncoder::try_encoder(ObIColumnEncoder *&encoder, const int64_t c
#define MICRO_BLOCK_PAGE_ALLOCATOR ModulePageAllocator(common::ObModIds::OB_ENCODER_ALLOCATOR, MTL_ID())
ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
encoding_meta_allocator_(),
data_buffer_(),
datum_rows_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
@ -113,6 +114,7 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
length_(0),
is_inited_(false)
{
encoding_meta_allocator_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
datum_rows_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
all_col_datums_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
encoders_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
@ -207,6 +209,7 @@ void ObMicroBlockEncoder::reset()
ObIMicroBlockWriter::reset();
is_inited_ = false;
//ctx_
encoding_meta_allocator_.reset();
data_buffer_.reset();
datum_rows_.reset();
FOREACH(cv, all_col_datums_) {
@ -242,6 +245,7 @@ void ObMicroBlockEncoder::reuse()
ObIMicroBlockWriter::reuse();
// is_inited_
// ctx_
encoding_meta_allocator_.reuse();
data_buffer_.reuse();
datum_rows_.reuse();
FOREACH(c, all_col_datums_) {
@ -497,7 +501,7 @@ int ObMicroBlockEncoder::reserve_header(const ObMicroBlockEncodingCtx &ctx)
}
return ret;
}
int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta_offset)
int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(ObBufferWriter &buf_writer, int64_t &encoding_meta_offset)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -529,37 +533,32 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta
LOG_WARN("advance data buffer failed", K(ret), K(col_header_size), K(encoding_meta_offset));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); ++i) {
int64_t pos_bak = data_buffer_.length();
int64_t pos_bak = buf_writer.length();
ObIColumnEncoder::EncoderDesc &desc = encoders_.at(i)->get_desc();
ObBufferWriter buffer_writer(data_buffer_.data(), data_buffer_.size(), data_buffer_.length());
if (OB_FAIL(encoders_.at(i)->store_meta(buffer_writer))) {
if (OB_FAIL(encoders_.at(i)->store_meta(buf_writer))) {
LOG_WARN("store encoding meta failed", K(ret));
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
} else {
ObColumnHeader &ch = encoders_.at(i)->get_column_header();
if (data_buffer_.length() > pos_bak) {
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
ch.length_ = static_cast<uint32_t>(data_buffer_.length() - pos_bak);
if (buf_writer.length() > pos_bak) {
ch.offset_ = static_cast<uint32_t>(pos_bak);
ch.length_ = static_cast<uint32_t>(buf_writer.length() - pos_bak);
} else if (ObColumnHeader::RAW == encoders_.at(i)->get_type()) {
// column header offset records the start pos of the fix data, if needed
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
ch.offset_ = static_cast<uint32_t>(pos_bak);
}
ch.obj_type_ = static_cast<uint8_t>(encoders_.at(i)->get_obj_type());
}
if (OB_SUCC(ret) && !desc.is_var_data_ && desc.need_data_store_) {
if (OB_FAIL(encoders_.at(i)->store_fix_data(buffer_writer))) {
if (OB_FAIL(encoders_.at(i)->store_fix_data(buf_writer))) {
LOG_WARN("failed to store fixed data", K(ret));
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
}
}
}
}
if (OB_SUCC(ret)) {
get_header(data_buffer_)->row_data_offset_ = static_cast<int32_t>(data_buffer_.length());
get_header(data_buffer_)->row_data_offset_ = static_cast<int32_t>(encoding_meta_offset + buf_writer.length());
}
}
return ret;
@ -570,6 +569,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
int ret = OB_SUCCESS;
int64_t encoders_need_size = 0;
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
char *encoding_meta_buf = nullptr;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -586,14 +586,23 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
LOG_WARN("detect column encoding failed", K(ret));
} else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) {
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
// encoder pointers depend on this memory during build_block(), and its life cycyle needs to be longer than build_block().
} else if (OB_ISNULL(encoding_meta_buf = static_cast<char *>(encoding_meta_allocator_.alloc(encoders_need_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size));
} else {
STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct),
K(datum_rows_.count()), K(ctx_));
// <1> store encoding metas and fix cols data
// <1> store encoding metas and fix cols data in encoding_meta_buffer
int64_t encoding_meta_offset = 0;
if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) {
int64_t encoding_meta_size = 0;
ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0);
if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) {
LOG_WARN("failed to store encoding meta and fixed col data", K(ret));
} else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) {
} else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) {
STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_));
}
// <2> set row data store offset
@ -639,7 +648,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
}
}
// <5> fill header
// <5> fill header, encoding_meta and fix cols data
if (OB_SUCC(ret)) {
get_header(data_buffer_)->row_count_ = static_cast<uint32_t>(datum_rows_.count());
get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_;
@ -651,6 +660,8 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader));
data += sizeof(ObColumnHeader);
}
// fill encoding meta and fix cols data
MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size);
}
if (OB_SUCC(ret)) {
@ -815,8 +826,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
LOG_WARN("reserve array failed", K(ret), "count", var_data_encoders_.count());
} else if (OB_FAIL(row_indexs_.push_back(0))) {
LOG_WARN("add row index failed", K(ret));
} else if (OB_FAIL(data_buffer_.set_lazy_move_cur_buf())) {
STORAGE_LOG(WARN, "fail to set lazy move", K(ret), K(data_buffer_));
} else {
const int64_t row_data_offset = get_header(data_buffer_)->row_data_offset_;
for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) {
@ -900,9 +909,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
}
}
}
if (OB_SUCC(ret)) {
data_buffer_.move_buf();
}
}
return ret;