Merge branch 'column_store'

Co-authored-by: wangt1xiuyi <13547954130@163.com>
Co-authored-by: yangqise7en <877793735@qq.com>
Co-authored-by: Zach41 <zach_41@163.com>
This commit is contained in:
chaser-ch
2023-10-31 15:39:20 +00:00
committed by ob-robot
parent 4057fbc4ae
commit 566e920620
1375 changed files with 239147 additions and 56014 deletions

View File

@ -91,14 +91,25 @@ int ObMicroBlockEncoder::try_encoder(ObIColumnEncoder *&encoder, const int64_t c
return ret;
}
#define MICRO_BLOCK_PAGE_ALLOCATOR ModulePageAllocator(common::ObModIds::OB_ENCODER_ALLOCATOR, MTL_ID())
ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
data_buffer_(blocksstable::OB_ENCODING_LABEL_DATA_BUFFER),
datum_rows_(), all_col_datums_(),
buffered_rows_checksum_(0), estimate_size_(0), estimate_size_limit_(0),
data_buffer_(),
datum_rows_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
estimate_size_(0), estimate_size_limit_(0),
header_size_(0), expand_pct_(DEFAULT_ESTIMATE_REAL_SIZE_PCT),
row_buf_holder_(blocksstable::OB_ENCODING_LABEL_ROW_BUFFER, OB_MALLOC_MIDDLE_BLOCK_SIZE),
row_buf_holder_(),
encoders_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
fix_data_encoders_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
var_data_encoders_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
encoder_allocator_(encoder_sizes, ObMemAttr(MTL_ID(), common::ObModIds::OB_ENCODER_ALLOCATOR)),
string_col_cnt_(0), estimate_base_store_size_(0), length_(0),
row_indexs_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
hashtables_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
multi_prefix_trees_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
deep_copy_indexes_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
string_col_cnt_(0), estimate_base_store_size_(0),
col_ctxs_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
length_(0),
is_inited_(false)
{
datum_rows_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
@ -112,20 +123,13 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
deep_copy_indexes_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
col_ctxs_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
}
#undef MICRO_BLOCK_PAGE_ALLOCATOR
ObMicroBlockEncoder::~ObMicroBlockEncoder()
{
reset();
}
int64_t ObMicroBlockEncoder::get_data_size() const {
int64_t data_size = data_buffer_.length();
if (data_size == 0 && is_inited_) { //lazy allocate
data_size = ObMicroBlockHeader::get_serialize_size(
ctx_.column_cnt_, ctx_.need_calc_column_chksum_);
}
return data_size;
}
int ObMicroBlockEncoder::init(const ObMicroBlockEncodingCtx &ctx)
{
// can be init twice
@ -150,8 +154,8 @@ int ObMicroBlockEncoder::init(const ObMicroBlockEncodingCtx &ctx)
LOG_WARN("reserve array failed", K(ret), "size", ctx.column_cnt_);
} else if (OB_FAIL(init_all_col_values(ctx))) {
LOG_WARN("init all_col_values failed", K(ret), K(ctx));
} else if (OB_FAIL(row_buf_holder_.init(ctx.macro_block_size_, MTL_ID()))) {
LOG_WARN("init row buf holder failed", K(ret));
} else if (OB_FAIL(checksum_helper_.init(ctx.col_descs_, true/*need_opt_row_checksum*/))) {
LOG_WARN("fail to init checksum_helper", K(ret), K(ctx));
} else {
// TODO bin.lb: shrink all_col_values_ size
estimate_base_store_size_ = 0;
@ -174,27 +178,34 @@ int ObMicroBlockEncoder::inner_init()
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("fail to reserve enough space for micro block buffer", K(ret));
} else if (data_buffer_.is_dirty()) {
} else if (data_buffer_.length() > 0) {
// has been inner_inited, do nothing
} else if (OB_UNLIKELY(!ctx_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid ctx", K(ret), K_(ctx));
} else if (OB_FAIL(data_buffer_.ensure_space(DEFAULT_DATA_BUFFER_SIZE))) {
LOG_WARN("fail to reserve enough space for micro block buffer", K(ret));
} else if (OB_FAIL(reserve_header(ctx_))) {
LOG_WARN("fail to reserve micro header", K(ret));
} else {
update_estimate_size_limit(ctx_);
if (OB_UNLIKELY(!ctx_.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected invalid ctx", K(ret), K_(ctx));
} else if (!data_buffer_.is_inited()) {
if (OB_FAIL(data_buffer_.init(DEFAULT_DATA_BUFFER_SIZE))) {
STORAGE_LOG(WARN, "fail to init data buffer", K(ret));
} else if (OB_FAIL(row_buf_holder_.init(2 * DEFAULT_DATA_BUFFER_SIZE))) {
STORAGE_LOG(WARN, "fail to init row_buf_holder", K(ret));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(reserve_header(ctx_))) {
LOG_WARN("fail to reserve micro header", K(ret));
} else {
update_estimate_size_limit(ctx_);
}
}
return ret;
}
void ObMicroBlockEncoder::reset()
{
ObIMicroBlockWriter::reuse();
ObIMicroBlockWriter::reset();
is_inited_ = false;
//ctx_
header_ = nullptr;
data_buffer_.reset();
datum_rows_.reset();
FOREACH(cv, all_col_datums_) {
@ -227,18 +238,16 @@ void ObMicroBlockEncoder::reuse()
// is_inited_
// ctx_
data_buffer_.reuse();
header_ = nullptr;
datum_rows_.reuse();
FOREACH(c, all_col_datums_) {
(*c)->reuse();
}
buffered_rows_checksum_ = 0;
row_buf_holder_.reuse();
estimate_size_ = 0;
// estimate_size_limit_
// header_size_
// expand_pct_
// estimate_base_store_size_ should only recalculate with initialization
// row_buf_holder_ no need to reuse
fix_data_encoders_.reuse();
var_data_encoders_.reuse();
free_encoders();
@ -251,33 +260,10 @@ void ObMicroBlockEncoder::reuse()
length_ = 0;
}
int ObMicroBlockEncoder::calc_and_validate_checksum(const ObDatumRow &row)
{
int ret = OB_SUCCESS;
micro_block_checksum_ = cal_row_checksum(row, micro_block_checksum_);
if (OB_LIKELY(micro_block_merge_verify_level_ <
MICRO_BLOCK_MERGE_VERIFY_LEVEL::ENCODING_AND_COMPRESSION_AND_WRITE_COMPLETE)) {
// do nothing
} else if (OB_UNLIKELY(datum_rows_.count() <= 0)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected datum rows count", K(ret), K(datum_rows_.count()));
} else {
ObConstDatumRow &buf_row = datum_rows_.at(datum_rows_.count() - 1);
for (int64_t col_idx = 0; col_idx < buf_row.get_column_count(); ++col_idx) {
buffered_rows_checksum_ = buf_row.datums_[col_idx].checksum(buffered_rows_checksum_);
}
if (OB_UNLIKELY(buffered_rows_checksum_ != micro_block_checksum_)) {
ret = OB_CHECKSUM_ERROR;
LOG_DBA_ERROR(OB_CHECKSUM_ERROR, "msg", "micro block checksum is not equal", K(ret), K_(micro_block_checksum),
K_(buffered_rows_checksum), K(row), K(buf_row));
}
}
return ret;
}
void ObMicroBlockEncoder::dump_diagnose_info() const
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
@ -285,12 +271,19 @@ void ObMicroBlockEncoder::dump_diagnose_info() const
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected encoding ctx", K_(ctx));
} else {
ObIMicroBlockWriter::dump_diagnose_info();
print_micro_block_encoder_status();
ObDatumRow datum_row;
ObStoreRow store_row;
int64_t orig_checksum = 0;
const int64_t column_cnt = ctx_.column_cnt_;
if (OB_FAIL(datum_row.init(column_cnt))) {
ObArenaAllocator allocator("DumpMicroInfo");
ObMicroBlockChecksumHelper tmp_checksum_helper;
blocksstable::ObNewRowBuilder new_row_builder;
if (OB_FAIL(tmp_checksum_helper.init(ctx_.col_descs_, true))) {
LOG_WARN("Failed to init ObMicroBlockChecksumHelper", K(ret), KPC_(ctx_.col_descs));
} else if (OB_FAIL(new_row_builder.init(*ctx_.col_descs_, allocator))) {
LOG_WARN("Failed to init ObNewRowBuilder", K(ret), KPC_(ctx_.col_descs));
} else if (OB_FAIL(datum_row.init(column_cnt))) {
LOG_WARN("fail to init datum row", K_(ctx), K(column_cnt));
} else {
for (int64_t it = 0; OB_SUCC(ret) && it < datum_rows_.count(); ++it) {
@ -298,9 +291,12 @@ void ObMicroBlockEncoder::dump_diagnose_info() const
datum_row.storage_datums_[col_idx].ptr_ = datum_rows_.at(it).datums_[col_idx].ptr_;
datum_row.storage_datums_[col_idx].pack_ = datum_rows_.at(it).datums_[col_idx].pack_;
}
orig_checksum = ObIMicroBlockWriter::cal_row_checksum(datum_row, orig_checksum);
FLOG_WARN("error micro block row (original)", K(it), K(datum_row), K(orig_checksum));
if (OB_FAIL(datum_row.to_store_row(*ctx_.col_descs_, store_row))) {
if (OB_TMP_FAIL(tmp_checksum_helper.cal_row_checksum(datum_row.storage_datums_, column_cnt))) {
LOG_WARN("Failed to cal row checksum", K(ret), K(datum_row));
}
FLOG_WARN("error micro block row (original)", K(it), K(datum_row),
"orig_checksum", tmp_checksum_helper.get_row_checksum());
if (OB_FAIL(new_row_builder.build_store_row(datum_row, store_row))) {
LOG_WARN("Failed to transfer datum row to store row", K(ret), K(datum_row));
} else {
FLOG_WARN("error micro block store_row (original)", K(it), K(store_row));
@ -333,7 +329,7 @@ int ObMicroBlockEncoder::init_all_col_values(const ObMicroBlockEncodingCtx &ctx)
void ObMicroBlockEncoder::print_micro_block_encoder_status() const
{
FLOG_INFO("Build micro block failed, print encoder status: ", K_(ctx),
K_(header), K_(estimate_size), K_(estimate_size_limit), K_(header_size),
K_(estimate_size), K_(estimate_size_limit), K_(header_size),
K_(expand_pct), K_(string_col_cnt), K_(estimate_base_store_size), K_(length));
int64_t idx = 0;
FOREACH(e, encoders_) {
@ -362,7 +358,6 @@ void ObMicroBlockEncoder::update_estimate_size_limit(const ObMicroBlockEncodingC
int64_t data_size_limit = (2 * ctx.micro_block_size_ - header_size_) > 0
? (2 * ctx.micro_block_size_ - header_size_)
: (2 * ctx.micro_block_size_);
//TODO use 4.1.0.0 for version judgment
if(ctx.major_working_cluster_version_ >= DATA_VERSION_4_1_0_0 ) {
data_size_limit = MAX(data_size_limit, DEFAULT_MICRO_MAX_SIZE);
}
@ -396,14 +391,7 @@ int ObMicroBlockEncoder::append_row(const ObDatumRow &row)
LOG_WARN("failed to inner init", K(ret));
} else {
if (datum_rows_.empty()) {
// Allocate a block to hold all rows in one micro block, size is 3*estimate_size_limit_.
// The maximum size of a micro block is estimate_size_limit_, but set 3 times of it in memory
// due to memory struct need to store some other meta information.
// When micro block is too small, estimate_size_ could be 0. Each row bceomes a large row.
if (0 != estimate_size_limit_ && OB_FAIL(row_buf_holder_.try_alloc(3 * estimate_size_limit_))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("fail to try allocate memory for buf holder", K(ret), K_(3 * estimate_size_limit));
} else if (OB_FAIL(init_column_ctxs())) {
if (OB_FAIL(init_column_ctxs())) {
LOG_WARN("build deep copy indexes failed", K(ret), K_(ctx));
}
}
@ -412,6 +400,7 @@ int ObMicroBlockEncoder::append_row(const ObDatumRow &row)
if (OB_SUCC(ret)) {
int64_t store_size = 0;
ObConstDatumRow datum_row;
ObMicroBlockHeader *header = get_header(data_buffer_);
if (OB_UNLIKELY(datum_rows_.count() >= MAX_MICRO_BLOCK_ROW_CNT)) {
ret = OB_BUF_NOT_ENOUGH;
LOG_INFO("Try to encode more rows than maximum of row cnt in header, force to build a block",
@ -422,12 +411,14 @@ int ObMicroBlockEncoder::append_row(const ObDatumRow &row)
if (OB_UNLIKELY(OB_BUF_NOT_ENOUGH != ret)) {
LOG_WARN("copy and append row failed", K(ret));
}
} else if (header_->has_column_checksum_
&& OB_FAIL(cal_column_checksum(row, header_->column_checksums_))) {
} else if (header->has_column_checksum_ && OB_FAIL(checksum_helper_.cal_column_checksum(
row, header->column_checksums_))) {
LOG_WARN("cal column checksum failed", K(ret), K(row));
} else if (need_cal_row_checksum() && OB_FAIL(calc_and_validate_checksum(row))) {
LOG_WARN("fail to calc and validate row checksum", K(ret), K_(ctx));
} else {
if (need_cal_row_checksum()
&& OB_FAIL(checksum_helper_.cal_row_checksum(row.storage_datums_, row.get_column_count()))) {
STORAGE_LOG(WARN, "fail to cal row chksum", K(ret), K(row), K_(checksum_helper));
}
cal_row_stat(row);
estimate_size_ += store_size;
}
@ -479,23 +470,20 @@ int ObMicroBlockEncoder::reserve_header(const ObMicroBlockEncodingCtx &ctx)
} else {
int32_t header_size = ObMicroBlockHeader::get_serialize_size(
ctx.column_cnt_, ctx.need_calc_column_chksum_);
header_ = reinterpret_cast<ObMicroBlockHeader*>(data_buffer_.data());
if (OB_FAIL(data_buffer_.advance(header_size))) {
if (OB_FAIL(data_buffer_.write_nop(header_size, true))) {
LOG_WARN("data buffer fail to advance header size.", K(ret), K(header_size));
} else {
MEMSET(header_, 0, header_size);
header_->magic_ = MICRO_BLOCK_HEADER_MAGIC;
header_->version_ = MICRO_BLOCK_HEADER_VERSION;
header_->header_size_ = header_size;
header_->row_store_type_ = ctx.row_store_type_;
header_->column_count_ = ctx.column_cnt_;
header_->rowkey_column_count_ = ctx.rowkey_column_cnt_;
header_->has_column_checksum_ = ctx.need_calc_column_chksum_;
if (header_->has_column_checksum_) {
header_->column_checksums_ = reinterpret_cast<int64_t *>(
ObMicroBlockHeader *header = get_header(data_buffer_);
header->magic_ = MICRO_BLOCK_HEADER_MAGIC;
header->version_ = MICRO_BLOCK_HEADER_VERSION;
header->header_size_ = header_size;
header->row_store_type_ = ctx.row_store_type_;
header->column_count_ = ctx.column_cnt_;
header->rowkey_column_count_ = ctx.rowkey_column_cnt_;
header->has_column_checksum_ = ctx.need_calc_column_chksum_;
if (header->has_column_checksum_) {
header->column_checksums_ = reinterpret_cast<int64_t *>(
data_buffer_.data() + ObMicroBlockHeader::COLUMN_CHECKSUM_PTR_OFFSET);
} else {
header_->column_checksums_ = nullptr;
}
}
}
@ -508,39 +496,43 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else {
ObMicroBlockHeader *header = get_header(data_buffer_);
// detect extend value bit
header_->extend_value_bit_ = 0;
header->extend_value_bit_ = 0;
FOREACH(e, encoders_) {
ObIColumnEncoder::EncoderDesc &desc = (*e)->get_desc();
if (desc.has_null_ || desc.has_nope_) {
header_->extend_value_bit_ = 1;
header->extend_value_bit_ = 1;
if (desc.has_nope_) {
header_->extend_value_bit_ = 2;
header->extend_value_bit_ = 2;
break;
}
}
}
FOREACH(e, encoders_) {
((*e))->set_extend_value_bit(header_->extend_value_bit_);
((*e))->set_extend_value_bit(header->extend_value_bit_);
}
const int64_t header_size = data_buffer_.pos();
const int64_t header_size = data_buffer_.length();
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
encoding_meta_offset = header_size + col_header_size;
if (OB_FAIL(data_buffer_.advance_zero(col_header_size))) {
if (OB_FAIL(data_buffer_.write_nop(col_header_size))) {
LOG_WARN("advance data buffer failed", K(ret), K(col_header_size), K(encoding_meta_offset));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); ++i) {
int64_t pos_bak = data_buffer_.pos();
int64_t pos_bak = data_buffer_.length();
ObIColumnEncoder::EncoderDesc &desc = encoders_.at(i)->get_desc();
if (OB_FAIL(encoders_.at(i)->store_meta(data_buffer_))) {
ObBufferWriter buffer_writer(data_buffer_.data(), data_buffer_.size(), data_buffer_.length());
if (OB_FAIL(encoders_.at(i)->store_meta(buffer_writer))) {
LOG_WARN("store encoding meta failed", K(ret));
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
} else {
ObColumnHeader &ch = encoders_.at(i)->get_column_header();
if (data_buffer_.pos() > pos_bak) {
if (data_buffer_.length() > pos_bak) {
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
ch.length_ = static_cast<uint32_t>(data_buffer_.pos() - pos_bak);
ch.length_ = static_cast<uint32_t>(data_buffer_.length() - pos_bak);
} else if (ObColumnHeader::RAW == encoders_.at(i)->get_type()) {
// column header offset records the start pos of the fix data, if needed
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
@ -549,14 +541,18 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta
}
if (OB_SUCC(ret) && !desc.is_var_data_ && desc.need_data_store_) {
if (OB_FAIL(encoders_.at(i)->store_fix_data(data_buffer_))) {
if (OB_FAIL(encoders_.at(i)->store_fix_data(buffer_writer))) {
LOG_WARN("failed to store fixed data", K(ret));
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
}
}
}
}
header_->row_data_offset_ = static_cast<int32_t>(data_buffer_.pos());
if (OB_SUCC(ret)) {
get_header(data_buffer_)->row_data_offset_ = static_cast<int32_t>(data_buffer_.length());
}
}
return ret;
}
@ -564,18 +560,24 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta
int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
{
int ret = OB_SUCCESS;
int64_t encoders_need_size = 0;
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(datum_rows_.empty())) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("empty micro block", K(ret));
} else if (OB_FAIL(set_datum_rows_ptr())) {
STORAGE_LOG(WARN, "fail to set datum rows ptr", K(ret));
} else if (OB_FAIL(pivot())) {
LOG_WARN("pivot rows to columns failed", K(ret));
} else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {
LOG_WARN("array reserve failed", K(ret), "count", datum_rows_.count());
} else if (OB_FAIL(encoder_detection())) {
} else if (OB_FAIL(encoder_detection(encoders_need_size))) {
LOG_WARN("detect column encoding failed", K(ret));
} else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) {
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
} else {
STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct),
K(datum_rows_.count()), K(ctx_));
@ -592,7 +594,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
if (OB_FAIL(set_row_data_pos(fix_data_size))) {
LOG_WARN("set row data position failed", K(ret));
} else {
header_->var_column_count_ = static_cast<uint16_t>(var_data_encoders_.count());
get_header(data_buffer_)->var_column_count_ = static_cast<uint16_t>(var_data_encoders_.count());
}
}
@ -606,18 +608,20 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
// <4> fill row index
if (OB_SUCC(ret)) {
if (var_data_encoders_.empty()) {
header_->row_index_byte_ = 0;
get_header(data_buffer_)->row_index_byte_ = 0;
} else {
header_->row_index_byte_ = 2;
get_header(data_buffer_)->row_index_byte_ = 2;
if (row_indexs_.at(row_indexs_.count() - 1) > UINT16_MAX) {
header_->row_index_byte_ = 4;
get_header(data_buffer_)->row_index_byte_ = 4;
}
ObIntegerArrayGenerator gen;
const int64_t row_index_size = row_indexs_.count() * header_->row_index_byte_;
if (OB_FAIL(gen.init(data_buffer_.current(), header_->row_index_byte_))) {
const int64_t row_index_size = row_indexs_.count() * get_header(data_buffer_)->row_index_byte_;
if (OB_FAIL(data_buffer_.ensure_space(row_index_size))) {
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(row_index_size), K(data_buffer_));
} else if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) {
LOG_WARN("init integer array generator failed",
K(ret), "byte", header_->row_index_byte_);
} else if (OB_FAIL(data_buffer_.advance_zero(row_index_size))) {
K(ret), "byte", get_header(data_buffer_)->row_index_byte_);
} else if (OB_FAIL(data_buffer_.write_nop(row_index_size))) {
LOG_WARN("advance data buffer failed", K(ret), K(row_index_size));
} else {
for (int64_t idx = 0; idx < row_indexs_.count(); ++idx) {
@ -629,11 +633,11 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
// <5> fill header
if (OB_SUCC(ret)) {
header_->row_count_ = static_cast<uint32_t>(datum_rows_.count());
header_->has_string_out_row_ = has_string_out_row_;
header_->all_lob_in_row_ = !has_lob_out_row_;
const int64_t header_size = header_->header_size_;
get_header(data_buffer_)->row_count_ = static_cast<uint32_t>(datum_rows_.count());
get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_;
get_header(data_buffer_)->all_lob_in_row_ = !has_lob_out_row_;
get_header(data_buffer_)->max_merged_trans_version_ = max_merged_trans_version_;
const int64_t header_size = get_header(data_buffer_)->header_size_;
char *data = data_buffer_.data() + header_size;
FOREACH(e, encoders_) {
MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader));
@ -691,7 +695,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
if (OB_SUCC(ret)) {
buf = data_buffer_.data();
size = data_buffer_.pos();
size = data_buffer_.length();
}
}
@ -711,6 +715,7 @@ int ObMicroBlockEncoder::set_row_data_pos(int64_t &fix_data_size)
// detect extend value bit size and dispatch encoders.
int64_t ext_bit_size = 0;
ObMicroBlockHeader *header = get_header(data_buffer_);
FOREACH_X(e, encoders_, OB_SUCC(ret)) {
ObIColumnEncoder::EncoderDesc &desc = (*e)->get_desc();
if (desc.need_data_store_ && desc.is_var_data_) {
@ -718,7 +723,7 @@ int ObMicroBlockEncoder::set_row_data_pos(int64_t &fix_data_size)
LOG_WARN("add encoder to array failed", K(ret));
} else if (desc.need_extend_value_bit_store_) {
(*e)->get_column_header().extend_value_index_ = static_cast<int16_t>(ext_bit_size);
ext_bit_size += header_->extend_value_bit_;
ext_bit_size += header->extend_value_bit_;
}
}
}
@ -802,8 +807,10 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
LOG_WARN("reserve array failed", K(ret), "count", var_data_encoders_.count());
} else if (OB_FAIL(row_indexs_.push_back(0))) {
LOG_WARN("add row index failed", K(ret));
} else if (OB_FAIL(data_buffer_.set_lazy_move_cur_buf())) {
STORAGE_LOG(WARN, "fail to set lazy move", K(ret), K(data_buffer_));
} else {
const int64_t row_data_offset = get_header(data_buffer_)->row_data_offset_;
for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) {
if (OB_FAIL(var_lengths.push_back(0))) {
LOG_WARN("add var data length failed", K(ret));
@ -836,51 +843,58 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
// fill var data
row_size = fix_data_size + var_data_size + (column_index_byte > 0 ? 1 : 0)
+ column_index_byte * (var_data_encoders_.count() - 1);
char *data = data_buffer_.current();
ObBitStream bs;
if (OB_FAIL(ret)) {
} else if (OB_FAIL(data_buffer_.advance_zero(row_size))) {
LOG_WARN("advance buffer failed", K(ret), K_(data_buffer), K(row_size));
} else if (OB_FAIL(bs.init(reinterpret_cast<unsigned char *>(data), fix_data_size))) {
LOG_WARN("init bit stream failed", K(ret), K(fix_data_size));
} else if (OB_FAIL(data_buffer_.ensure_space(row_size))) {
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(row_size), K(data_buffer_));
} else {
MEMSET(data, 0, row_size);
char *column_index_byte_ptr = data + fix_data_size;
char *data = data_buffer_.current();
ObBitStream bs;
if (OB_FAIL(data_buffer_.write_nop(row_size))) {
LOG_WARN("advance buffer failed", K(ret), K_(data_buffer), K(row_size));
} else if (OB_FAIL(bs.init(reinterpret_cast<unsigned char *>(data), fix_data_size))) {
LOG_WARN("init bit stream failed", K(ret), K(fix_data_size));
} else {
MEMSET(data, 0, row_size);
char *column_index_byte_ptr = data + fix_data_size;
ObIntegerArrayGenerator gen;
if (OB_SUCC(ret) && column_index_byte > 0) {
if (OB_FAIL(gen.init(data + fix_data_size + 1, column_index_byte))) {
LOG_WARN("init integer array generator failed ", K(ret), K(column_index_byte));
}
}
if (OB_SUCC(ret) && !var_data_encoders_.empty()) {
char *var_data = column_index_byte_ptr;
if (column_index_byte > 0) {
*column_index_byte_ptr = static_cast<int8_t>(column_index_byte);
var_data += 1 + column_index_byte * (var_data_encoders_.count() - 1);
}
int64_t offset = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) {
if (i > 0) {
gen.get_array().set(i - 1, offset);
ObIntegerArrayGenerator gen;
if (OB_SUCC(ret) && column_index_byte > 0) {
if (OB_FAIL(gen.init(data + fix_data_size + 1, column_index_byte))) {
LOG_WARN("init integer array generator failed ", K(ret), K(column_index_byte));
}
const int64_t length = var_lengths.at(i);
if (OB_FAIL(store_data(*var_data_encoders_.at(i),
row_id, bs, var_data + offset, length))) {
LOG_WARN("store var length data failed",
K(ret), K(row_id), K(offset), K(length));
}
if (OB_SUCC(ret) && !var_data_encoders_.empty()) {
char *var_data = column_index_byte_ptr;
if (column_index_byte > 0) {
*column_index_byte_ptr = static_cast<int8_t>(column_index_byte);
var_data += 1 + column_index_byte * (var_data_encoders_.count() - 1);
}
int64_t offset = 0;
for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) {
if (i > 0) {
gen.get_array().set(i - 1, offset);
}
const int64_t length = var_lengths.at(i);
if (OB_FAIL(store_data(*var_data_encoders_.at(i),
row_id, bs, var_data + offset, length))) {
LOG_WARN("store var length data failed",
K(ret), K(row_id), K(offset), K(length));
}
offset += length;
}
offset += length;
}
}
}
if (OB_SUCC(ret)) {
if (OB_FAIL(row_indexs_.push_back(data_buffer_.pos() - header_->row_data_offset_))) {
if (OB_FAIL(row_indexs_.push_back(data_buffer_.length() - row_data_offset))) {
LOG_WARN("add row index failed", K(ret));
}
}
}
if (OB_SUCC(ret)) {
data_buffer_.move_buf();
}
}
return ret;
@ -905,6 +919,22 @@ int ObMicroBlockEncoder::init_column_ctxs()
return ret;
}
int ObMicroBlockEncoder::set_datum_rows_ptr()
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(row_buf_holder_.has_expand())) {
for (int64_t i = 0; OB_SUCC(ret) && i < datum_rows_.count(); i++) {
ObConstDatumRow &datum_row = datum_rows_.at(i);
if (OB_FAIL(datum_row.set_datums_ptr(row_buf_holder_.data()))) {
STORAGE_LOG(WARN, "fail to set datums ptr", K(ret), K(datum_row), K(row_buf_holder_));
}
}
}
return ret;
}
int ObMicroBlockEncoder::process_out_row_columns(const ObDatumRow &row)
{
// make sure in&out row status of all values in a column are same
@ -944,33 +974,61 @@ int ObMicroBlockEncoder::copy_and_append_row(const ObDatumRow &src, int64_t &sto
// performance critical, do not double check parameters in private method
const int64_t datums_len = sizeof(ObDatum) * src.get_column_count();
bool is_large_row = false;
const int64_t datum_row_offset = length_;
ObDatum *datum_arr = nullptr;
if (datum_rows_.count() > 0
&& (length_ + datums_len >= estimate_size_limit_ || estimate_size_ >= estimate_size_limit_)) {
ret = OB_BUF_NOT_ENOUGH;
} else if (0 == datum_rows_.count() && length_ + datums_len >= estimate_size_limit_) {
is_large_row = true;
} else if (OB_FAIL(row_buf_holder_.ensure_space(datums_len))) {
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(datums_len), K(row_buf_holder_));
} else {
char *datum_arr_buf = row_buf_holder_.get_buf() + length_;
MEMSET(datum_arr_buf, 0, datums_len);
datum_arr = reinterpret_cast<ObDatum *>(datum_arr_buf);
length_ += datums_len;
store_size = 0;
for (int64_t col_idx = 0;
OB_SUCC(ret) && col_idx < src.get_column_count() && !is_large_row;
++col_idx) {
if (OB_FAIL(copy_cell(
ctx_.col_descs_->at(col_idx),
src.storage_datums_[col_idx],
datum_arr[col_idx],
store_size,
is_large_row))) {
if (OB_UNLIKELY(OB_BUF_NOT_ENOUGH != ret)) {
LOG_WARN("fail to copy cell", K(ret), K(col_idx), K(src), K(store_size), K(is_large_row));
bool is_finish = false;
while(OB_SUCC(ret) && !is_finish) {
char *datum_arr_buf = row_buf_holder_.data() + datum_row_offset;
const int64_t buffer_size = row_buf_holder_.size();
MEMSET(datum_arr_buf, 0, datums_len);
datum_arr = reinterpret_cast<ObDatum *>(datum_arr_buf);
length_ = datum_row_offset + datums_len;
store_size = 0;
is_finish = true;
bool is_buffer_not_enough = false;
for (int64_t col_idx = 0;
OB_SUCC(ret) && col_idx < src.get_column_count() && !is_large_row; ++col_idx) {
if (OB_FAIL(copy_cell(
ctx_.col_descs_->at(col_idx),
src.storage_datums_[col_idx],
datum_arr[col_idx],
store_size,
is_large_row,
is_buffer_not_enough))) {
if (OB_UNLIKELY(OB_BUF_NOT_ENOUGH != ret)) {
LOG_WARN("fail to copy cell", K(ret), K(col_idx), K(src), K(store_size), K(is_large_row));
}
} else if (is_buffer_not_enough) {
int64_t var_len_column_cnt = 0;
int64_t need_size = calc_datum_row_size(src, var_len_column_cnt);
if (ctx_.major_working_cluster_version_ >= DATA_VERSION_4_1_0_0) {
need_size = need_size + var_len_column_cnt * sizeof(uint64_t);
}
if (OB_UNLIKELY(need_size <= row_buf_holder_.size() - row_buf_holder_.length())) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected buffer not enough", K(ret), K(need_size), K(row_buf_holder_));
} else if (OB_FAIL(row_buf_holder_.ensure_space(need_size))) {// ensure cell of row is contiguous
STORAGE_LOG(WARN, "failed to ensure space", K(ret), K(need_size), K(row_buf_holder_));
} else {
is_finish = false;
break;
}
}
}
}
if (OB_SUCC(ret) && !is_large_row) {
if (OB_FAIL(row_buf_holder_.write_nop(length_ - datum_row_offset))) {
STORAGE_LOG(WARN, "fail to write nop", K(ret), K(length_), K(datum_row_offset), K(row_buf_holder_));
}
}
}
if (OB_FAIL(ret)) {
@ -981,7 +1039,7 @@ int ObMicroBlockEncoder::copy_and_append_row(const ObDatumRow &src, int64_t &sto
LOG_WARN("fail to try append row", K(ret));
}
} else {
ObConstDatumRow datum_row(datum_arr, src.get_column_count());
ObConstDatumRow datum_row(datum_arr, src.get_column_count(), datum_row_offset);
if (OB_FAIL(datum_rows_.push_back(datum_row))) {
LOG_WARN("append row to array failed", K(ret), K(src));
}
@ -994,14 +1052,16 @@ int ObMicroBlockEncoder::copy_cell(
const ObStorageDatum &src,
ObDatum &dest,
int64_t &store_size,
bool &is_large_row)
bool &is_large_row,
bool &is_buffer_not_enough)
{
// For IntSC and UIntSC, normalize to 8 byte
int ret = OB_SUCCESS;
ObObjTypeStoreClass store_class = get_store_class_map()[col_desc.col_type_.get_type_class()];
const bool is_int_sc = store_class == ObIntSC || store_class == ObUIntSC;
int64_t datum_size = 0;
dest.ptr_ = row_buf_holder_.get_buf() + length_;
is_buffer_not_enough = false;
dest.ptr_ = row_buf_holder_.data() + length_; //reset by set_datum_rows_ptr
dest.pack_ = src.pack_;
if (src.is_null()) {
dest.set_null();
@ -1030,6 +1090,8 @@ int ObMicroBlockEncoder::copy_cell(
// we still use micro block to store it, but its size is unknown, need special treat
} else if (datum_rows_.count() == 0 && estimate_size_ + store_size >= estimate_size_limit_) {
is_large_row = true;
} else if (row_buf_holder_.size() < length_ + datum_size) {
is_buffer_not_enough = true;
} else {
if (is_int_sc) {
MEMSET(const_cast<char *>(dest.ptr_), 0, datum_size);
@ -1049,34 +1111,20 @@ int ObMicroBlockEncoder::process_large_row(
// copy_size is the size in memory, including the ObDatum struct and extra space for data.
// store_size represents for the serialized data size on disk,
const int64_t datums_len = sizeof(ObDatum) * src.get_column_count();
int64_t copy_size = datums_len;
int64_t var_len_column_cnt = 0;
for (int64_t col_idx = 0; col_idx < src.get_column_count(); ++col_idx) {
const ObColDesc &col_desc = ctx_.col_descs_->at(col_idx);
ObObjTypeStoreClass store_class = get_store_class_map()[col_desc.col_type_.get_type_class()];
ObStorageDatum &datum = src.storage_datums_[col_idx];
if (!datum.is_null()) {
if (store_class == ObIntSC || store_class == ObUIntSC) {
copy_size += sizeof(uint64_t);
} else {
copy_size += datum.len_;
if (is_var_length_type(store_class)) {
++var_len_column_cnt;
}
}
} else {
copy_size += sizeof(uint64_t);
}
}
int64_t copy_size = calc_datum_row_size(src, var_len_column_cnt);
if (ctx_.major_working_cluster_version_ >= DATA_VERSION_4_1_0_0) {
store_size = copy_size - datums_len + var_len_column_cnt * sizeof(uint64_t);
} else {
store_size = copy_size - datums_len;
}
if (OB_FAIL(row_buf_holder_.try_alloc(copy_size))) {
LOG_WARN("Fail to alloc large row buffer", K(ret), K(copy_size));
if (OB_UNLIKELY(row_buf_holder_.length() != 0)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected row buffer len", K(ret), K(row_buf_holder_));
} else if (OB_FAIL(row_buf_holder_.write_nop(copy_size))) {
LOG_WARN("Fail to ensure_space", K(ret), K(copy_size), K(row_buf_holder_));
} else {
char *buf = row_buf_holder_.get_buf();
char *buf = row_buf_holder_.data();
MEMSET(buf, 0, datums_len);
datum_arr = reinterpret_cast<ObDatum *>(buf);
buf += datums_len;
@ -1106,6 +1154,31 @@ int ObMicroBlockEncoder::process_large_row(
return ret;
}
int64_t ObMicroBlockEncoder::calc_datum_row_size(const ObDatumRow &src, int64_t &var_len_column_cnt) const
{
int64_t need_size = sizeof(ObDatum) * src.get_column_count();
var_len_column_cnt = 0;
for (int64_t col_idx = 0; col_idx < src.get_column_count(); ++col_idx) {
const ObColDesc &col_desc = ctx_.col_descs_->at(col_idx);
ObObjTypeStoreClass store_class = get_store_class_map()[col_desc.col_type_.get_type_class()];
ObStorageDatum &datum = src.storage_datums_[col_idx];
if (!datum.is_null()) {
if (store_class == ObIntSC || store_class == ObUIntSC) {
need_size += sizeof(uint64_t);
} else {
need_size += datum.len_;
if (is_var_length_type(store_class)) {
++var_len_column_cnt;
}
}
} else {
need_size += sizeof(uint64_t);
}
}
return need_size;
}
int ObMicroBlockEncoder::prescan(const int64_t column_index)
{
int ret = OB_SUCCESS;
@ -1170,7 +1243,7 @@ int ObMicroBlockEncoder::prescan(const int64_t column_index)
return ret;
}
int ObMicroBlockEncoder::encoder_detection()
int ObMicroBlockEncoder::encoder_detection(int64_t &encoders_need_size)
{
int ret = OB_SUCCESS;
if (IS_NOT_INIT) {
@ -1194,6 +1267,7 @@ int ObMicroBlockEncoder::encoder_detection()
c->extend_value_bit_ = extend_value_bit;
}
}
encoders_need_size = 0;
for (int64_t i = 0; OB_SUCC(ret) && i <ctx_.column_cnt_; ++i) {
if (OB_FAIL(fast_encoder_detect(i, col_ctxs_.at(i)))) {
LOG_WARN("fast encoder detect failed", K(ret), K(i));
@ -1207,6 +1281,18 @@ int ObMicroBlockEncoder::encoder_detection()
}
}
}
int64_t need_size = 0;
int64_t encoder_cnt = encoders_.count();
if (OB_FAIL(ret)) {
} else if (OB_UNLIKELY(encoder_cnt <= i)) {
ret = OB_ERR_UNEXPECTED;
STORAGE_LOG(WARN, "unexpected encoder cnt", K(ret), K(encoder_cnt), K(i));
} else if (OB_FAIL(encoders_.at(encoder_cnt - 1)->get_encoding_store_meta_need_space(need_size))) {
STORAGE_LOG(WARN, "fail to get_encoding_store_meta_need_space", K(ret), K(i), K(encoders_));
} else {
need_size += encoders_.at(encoder_cnt - 1)->calc_encoding_fix_data_need_space();
encoders_need_size += need_size;
}
}
if (OB_FAIL(ret)) {
free_encoders();
@ -1800,7 +1886,7 @@ int ObMicroBlockEncoder::force_raw_encoding(const int64_t column_idx,
ret = OB_ERR_UNEXPECTED;
LOG_WARN("find raw encoder not suitable", K(ret));
} else {
e->set_extend_value_bit(header_->extend_value_bit_);
e->set_extend_value_bit(get_header(data_buffer_)->extend_value_bit_);
}
}