[opt] micro buffer writer
This commit is contained in:
@ -93,6 +93,7 @@ int ObMicroBlockEncoder::try_encoder(ObIColumnEncoder *&encoder, const int64_t c
|
||||
|
||||
#define MICRO_BLOCK_PAGE_ALLOCATOR ModulePageAllocator(common::ObModIds::OB_ENCODER_ALLOCATOR, MTL_ID())
|
||||
ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
|
||||
encoding_meta_allocator_(),
|
||||
data_buffer_(),
|
||||
datum_rows_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
|
||||
all_col_datums_(OB_MALLOC_NORMAL_BLOCK_SIZE, MICRO_BLOCK_PAGE_ALLOCATOR),
|
||||
@ -113,6 +114,7 @@ ObMicroBlockEncoder::ObMicroBlockEncoder() : ctx_(), header_(NULL),
|
||||
length_(0),
|
||||
is_inited_(false)
|
||||
{
|
||||
encoding_meta_allocator_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
|
||||
datum_rows_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
|
||||
all_col_datums_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
|
||||
encoders_.set_attr(ObMemAttr(MTL_ID(), "MicroBlkEncoder"));
|
||||
@ -207,6 +209,7 @@ void ObMicroBlockEncoder::reset()
|
||||
ObIMicroBlockWriter::reset();
|
||||
is_inited_ = false;
|
||||
//ctx_
|
||||
encoding_meta_allocator_.reset();
|
||||
data_buffer_.reset();
|
||||
datum_rows_.reset();
|
||||
FOREACH(cv, all_col_datums_) {
|
||||
@ -242,6 +245,7 @@ void ObMicroBlockEncoder::reuse()
|
||||
ObIMicroBlockWriter::reuse();
|
||||
// is_inited_
|
||||
// ctx_
|
||||
encoding_meta_allocator_.reuse();
|
||||
data_buffer_.reuse();
|
||||
datum_rows_.reuse();
|
||||
FOREACH(c, all_col_datums_) {
|
||||
@ -497,7 +501,7 @@ int ObMicroBlockEncoder::reserve_header(const ObMicroBlockEncodingCtx &ctx)
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta_offset)
|
||||
int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(ObBufferWriter &buf_writer, int64_t &encoding_meta_offset)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (IS_NOT_INIT) {
|
||||
@ -529,37 +533,32 @@ int ObMicroBlockEncoder::store_encoding_meta_and_fix_cols(int64_t &encoding_meta
|
||||
LOG_WARN("advance data buffer failed", K(ret), K(col_header_size), K(encoding_meta_offset));
|
||||
} else {
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); ++i) {
|
||||
int64_t pos_bak = data_buffer_.length();
|
||||
int64_t pos_bak = buf_writer.length();
|
||||
ObIColumnEncoder::EncoderDesc &desc = encoders_.at(i)->get_desc();
|
||||
ObBufferWriter buffer_writer(data_buffer_.data(), data_buffer_.size(), data_buffer_.length());
|
||||
if (OB_FAIL(encoders_.at(i)->store_meta(buffer_writer))) {
|
||||
if (OB_FAIL(encoders_.at(i)->store_meta(buf_writer))) {
|
||||
LOG_WARN("store encoding meta failed", K(ret));
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
|
||||
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
|
||||
} else {
|
||||
ObColumnHeader &ch = encoders_.at(i)->get_column_header();
|
||||
if (data_buffer_.length() > pos_bak) {
|
||||
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
|
||||
ch.length_ = static_cast<uint32_t>(data_buffer_.length() - pos_bak);
|
||||
if (buf_writer.length() > pos_bak) {
|
||||
ch.offset_ = static_cast<uint32_t>(pos_bak);
|
||||
ch.length_ = static_cast<uint32_t>(buf_writer.length() - pos_bak);
|
||||
} else if (ObColumnHeader::RAW == encoders_.at(i)->get_type()) {
|
||||
// column header offset records the start pos of the fix data, if needed
|
||||
ch.offset_ = static_cast<uint32_t>(pos_bak - encoding_meta_offset);
|
||||
ch.offset_ = static_cast<uint32_t>(pos_bak);
|
||||
}
|
||||
ch.obj_type_ = static_cast<uint8_t>(encoders_.at(i)->get_obj_type());
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && !desc.is_var_data_ && desc.need_data_store_) {
|
||||
if (OB_FAIL(encoders_.at(i)->store_fix_data(buffer_writer))) {
|
||||
if (OB_FAIL(encoders_.at(i)->store_fix_data(buf_writer))) {
|
||||
LOG_WARN("failed to store fixed data", K(ret));
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(buffer_writer.length() - data_buffer_.length()))) {
|
||||
STORAGE_LOG(WARN, "failed to wtite nop", K(ret), K(buffer_writer), K(data_buffer_));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
get_header(data_buffer_)->row_data_offset_ = static_cast<int32_t>(data_buffer_.length());
|
||||
get_header(data_buffer_)->row_data_offset_ = static_cast<int32_t>(encoding_meta_offset + buf_writer.length());
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -570,6 +569,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t encoders_need_size = 0;
|
||||
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
|
||||
char *encoding_meta_buf = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
@ -586,14 +586,23 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
|
||||
LOG_WARN("detect column encoding failed", K(ret));
|
||||
} else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
|
||||
// encoder pointers depend on this memory during build_block(), and its life cycyle needs to be longer than build_block().
|
||||
} else if (OB_ISNULL(encoding_meta_buf = static_cast<char *>(encoding_meta_allocator_.alloc(encoders_need_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct),
|
||||
K(datum_rows_.count()), K(ctx_));
|
||||
|
||||
// <1> store encoding metas and fix cols data
|
||||
// <1> store encoding metas and fix cols data in encoding_meta_buffer
|
||||
int64_t encoding_meta_offset = 0;
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) {
|
||||
int64_t encoding_meta_size = 0;
|
||||
ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0);
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) {
|
||||
LOG_WARN("failed to store encoding meta and fixed col data", K(ret));
|
||||
} else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) {
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) {
|
||||
STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_));
|
||||
}
|
||||
|
||||
// <2> set row data store offset
|
||||
@ -639,7 +648,7 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
|
||||
}
|
||||
}
|
||||
|
||||
// <5> fill header
|
||||
// <5> fill header, encoding_meta and fix cols data
|
||||
if (OB_SUCC(ret)) {
|
||||
get_header(data_buffer_)->row_count_ = static_cast<uint32_t>(datum_rows_.count());
|
||||
get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_;
|
||||
@ -651,6 +660,8 @@ int ObMicroBlockEncoder::build_block(char *&buf, int64_t &size)
|
||||
MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader));
|
||||
data += sizeof(ObColumnHeader);
|
||||
}
|
||||
// fill encoding meta and fix cols data
|
||||
MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -815,8 +826,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
|
||||
LOG_WARN("reserve array failed", K(ret), "count", var_data_encoders_.count());
|
||||
} else if (OB_FAIL(row_indexs_.push_back(0))) {
|
||||
LOG_WARN("add row index failed", K(ret));
|
||||
} else if (OB_FAIL(data_buffer_.set_lazy_move_cur_buf())) {
|
||||
STORAGE_LOG(WARN, "fail to set lazy move", K(ret), K(data_buffer_));
|
||||
} else {
|
||||
const int64_t row_data_offset = get_header(data_buffer_)->row_data_offset_;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < var_data_encoders_.count(); ++i) {
|
||||
@ -900,9 +909,6 @@ int ObMicroBlockEncoder::fill_row_data(const int64_t fix_data_size)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
data_buffer_.move_buf();
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -143,7 +143,7 @@ private:
|
||||
|
||||
void update_estimate_size_limit(const ObMicroBlockEncodingCtx &ctx);
|
||||
|
||||
int store_encoding_meta_and_fix_cols(int64_t &encoding_meta_offset);
|
||||
int store_encoding_meta_and_fix_cols(ObBufferWriter &buf_writer, int64_t &encoding_meta_offset);
|
||||
int init_all_col_values(const ObMicroBlockEncodingCtx &ctx);
|
||||
void print_micro_block_encoder_status() const;
|
||||
int set_datum_rows_ptr();
|
||||
@ -151,6 +151,7 @@ private:
|
||||
private:
|
||||
ObMicroBlockEncodingCtx ctx_;
|
||||
ObMicroBlockHeader *header_;
|
||||
ObArenaAllocator encoding_meta_allocator_;
|
||||
ObMicroBufferWriter data_buffer_;
|
||||
ObConstDatumRowArray datum_rows_;
|
||||
common::ObArray<ObColDatums *> all_col_datums_;
|
||||
|
@ -90,16 +90,10 @@ int ObMicroBufferWriter::init(const int64_t capacity, const int64_t reserve_size
|
||||
|
||||
void ObMicroBufferWriter::reset()
|
||||
{
|
||||
if (old_buf_ != nullptr) {
|
||||
allocator_.free(old_buf_);
|
||||
old_buf_ = nullptr;
|
||||
}
|
||||
if (data_ != nullptr) {
|
||||
allocator_.free(data_);
|
||||
data_ = nullptr;
|
||||
}
|
||||
old_size_ = 0;
|
||||
lazy_move_ = false;
|
||||
has_expand_ = false;
|
||||
memory_reclaim_cnt_ = 0;
|
||||
reset_memory_threshold_ = 0;
|
||||
@ -113,11 +107,6 @@ void ObMicroBufferWriter::reset()
|
||||
|
||||
void ObMicroBufferWriter::reuse()
|
||||
{
|
||||
if (old_buf_ != nullptr) {
|
||||
int ret = OB_ERR_SYS;
|
||||
STORAGE_LOG(ERROR, "unexcpected old buf", K(ret), K(*this));
|
||||
abort();
|
||||
}
|
||||
if (buffer_size_ > default_reserve_ && len_ <= default_reserve_) {
|
||||
memory_reclaim_cnt_++;
|
||||
if (memory_reclaim_cnt_ >= reset_memory_threshold_) {
|
||||
@ -136,8 +125,6 @@ void ObMicroBufferWriter::reuse()
|
||||
} else {
|
||||
memory_reclaim_cnt_ = 0;
|
||||
}
|
||||
old_size_ = 0;
|
||||
lazy_move_ = false;
|
||||
has_expand_ = false;
|
||||
len_ = 0;
|
||||
}
|
||||
@ -179,14 +166,8 @@ int ObMicroBufferWriter::reserve(const int64_t size)
|
||||
STORAGE_LOG(WARN, "failed to alloc memory", K(ret), K(alloc_size));
|
||||
} else if (data_ != nullptr) {
|
||||
has_expand_ = true;
|
||||
if (lazy_move_) {
|
||||
lazy_move_ = false;
|
||||
old_buf_ = data_;
|
||||
old_size_ = len_;
|
||||
} else {
|
||||
MEMCPY(buf, data_, len_);
|
||||
allocator_.free(data_);
|
||||
}
|
||||
data_ = nullptr;
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
|
@ -99,10 +99,7 @@ public:
|
||||
data_(nullptr),
|
||||
reset_memory_threshold_(0),
|
||||
memory_reclaim_cnt_(0),
|
||||
has_expand_(false),
|
||||
lazy_move_(false),
|
||||
old_buf_(nullptr),
|
||||
old_size_(0)
|
||||
has_expand_(false)
|
||||
{}
|
||||
~ObMicroBufferWriter() { reset(); };
|
||||
int init(const int64_t capacity, const int64_t reserve_size = DEFAULT_MIDDLE_BLOCK_SIZE);
|
||||
@ -111,32 +108,10 @@ public:
|
||||
inline int64_t remain_buffer_size() const { return buffer_size_ - len_; }
|
||||
inline int64_t size() const { return buffer_size_; } //curr buffer size
|
||||
inline bool has_expand() const { return has_expand_; }
|
||||
inline char *data() { assert(old_buf_ == nullptr); return data_; }
|
||||
inline char *data() { return data_; }
|
||||
inline char *current() { return data_ + len_; }
|
||||
int reserve(const int64_t size);
|
||||
int ensure_space(const int64_t append_size);
|
||||
// don't use it, only for encoding
|
||||
int set_lazy_move_cur_buf()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_UNLIKELY(old_buf_ != nullptr)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
STORAGE_LOG(WARN, "unexpected old buf", K(ret));
|
||||
} else {
|
||||
lazy_move_ = true;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
void move_buf()
|
||||
{
|
||||
lazy_move_ = false;
|
||||
if (old_buf_ != nullptr) {
|
||||
MEMCPY(data_, old_buf_, old_size_);
|
||||
allocator_.free(old_buf_);
|
||||
old_buf_ = nullptr;
|
||||
old_size_ = 0;
|
||||
}
|
||||
}
|
||||
inline void pop_back(const int64_t size) { len_ = MAX(0, len_ - size); }
|
||||
int write_nop(const int64_t size, bool is_zero = false);
|
||||
int write(const ObDatumRow &row, const int64_t rowkey_cnt, int64_t &len);
|
||||
@ -163,7 +138,7 @@ public:
|
||||
void reset();
|
||||
inline int64_t length() const { return len_; }
|
||||
TO_STRING_KV(K_(capacity), K_(buffer_size), K_(len), K_(data), K_(default_reserve), K_(reset_memory_threshold),
|
||||
K_(memory_reclaim_cnt), K_(has_expand), K_(lazy_move), K_(old_buf), K_(old_size));
|
||||
K_(memory_reclaim_cnt), K_(has_expand));
|
||||
private:
|
||||
int expand(const int64_t size);
|
||||
private:
|
||||
@ -180,10 +155,6 @@ private:
|
||||
int64_t memory_reclaim_cnt_;
|
||||
bool has_expand_;
|
||||
|
||||
bool lazy_move_;
|
||||
char *old_buf_;
|
||||
int64_t old_size_;
|
||||
|
||||
private:
|
||||
static const int64_t MIN_BUFFER_SIZE = 1 << 12; //4kb
|
||||
static const int64_t MAX_DATA_BUFFER_SIZE = 2 * common::OB_DEFAULT_MACRO_BLOCK_SIZE; // 4m
|
||||
|
@ -305,11 +305,13 @@ public:
|
||||
int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t need_size;
|
||||
if (!is_inited_) {
|
||||
int64_t encoders_need_size = 0;
|
||||
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
|
||||
char *encoding_meta_buf = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (datum_rows_.empty()) {
|
||||
} else if (OB_UNLIKELY(datum_rows_.empty())) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("empty micro block", K(ret));
|
||||
} else if (OB_FAIL(set_datum_rows_ptr())) {
|
||||
@ -318,13 +320,11 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
LOG_WARN("pivot rows to columns failed", K(ret));
|
||||
} else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {
|
||||
LOG_WARN("array reserve failed", K(ret), "count", datum_rows_.count());
|
||||
} else if (OB_FAIL(encoder_detection(need_size))) {
|
||||
} else if (OB_FAIL(encoder_detection(encoders_need_size))) {
|
||||
LOG_WARN("detect column encoding failed", K(ret));
|
||||
} else if (OB_FAIL(data_buffer_.ensure_space(1<<20))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
|
||||
} else {
|
||||
|
||||
for (int64_t i = 0; i < ctx_.column_cnt_; ++i) {
|
||||
encoders_need_size = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_.column_cnt_; ++i) {
|
||||
const bool force_var_store = false;
|
||||
if (NULL != encoders_[i]) {
|
||||
free_encoder(encoders_[i]);
|
||||
@ -341,12 +341,36 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
encoders_[i] = e;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); i++) {
|
||||
int64_t need_size = 0;
|
||||
if (OB_FAIL(encoders_.at(i)->get_encoding_store_meta_need_space(need_size))) {
|
||||
STORAGE_LOG(WARN, "fail to get_encoding_store_meta_need_space", K(ret), K(i), K(encoders_));
|
||||
} else {
|
||||
need_size += encoders_.at(i)->calc_encoding_fix_data_need_space();
|
||||
encoders_need_size += need_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
|
||||
} else if (OB_ISNULL(encoding_meta_buf = static_cast<char *>(encoding_meta_allocator_.alloc(encoders_need_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct),
|
||||
K(datum_rows_.count()), K(ctx_));
|
||||
|
||||
// <1> store encoding metas and fix cols data
|
||||
// <1> store encoding metas and fix cols data in encoding_meta_buffer
|
||||
int64_t encoding_meta_offset = 0;
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) {
|
||||
int64_t encoding_meta_size = 0;
|
||||
ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0);
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) {
|
||||
LOG_WARN("failed to store encoding meta and fixed col data", K(ret));
|
||||
} else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) {
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) {
|
||||
STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_));
|
||||
}
|
||||
|
||||
// <2> set row data store offset
|
||||
@ -355,7 +379,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
if (OB_FAIL(set_row_data_pos(fix_data_size))) {
|
||||
LOG_WARN("set row data position failed", K(ret));
|
||||
} else {
|
||||
get_header(data_buffer_)->var_column_count_ = static_cast<int16_t>(var_data_encoders_.count());
|
||||
get_header(data_buffer_)->var_column_count_ = static_cast<uint16_t>(var_data_encoders_.count());
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,10 +401,12 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
}
|
||||
ObIntegerArrayGenerator gen;
|
||||
const int64_t row_index_size = row_indexs_.count() * get_header(data_buffer_)->row_index_byte_;
|
||||
if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) {
|
||||
if (OB_FAIL(data_buffer_.ensure_space(row_index_size))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(row_index_size), K(data_buffer_));
|
||||
} else if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) {
|
||||
LOG_WARN("init integer array generator failed",
|
||||
K(ret), "byte", get_header(data_buffer_)->row_index_byte_);
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(row_index_size, true))) {
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(row_index_size))) {
|
||||
LOG_WARN("advance data buffer failed", K(ret), K(row_index_size));
|
||||
} else {
|
||||
for (int64_t idx = 0; idx < row_indexs_.count(); ++idx) {
|
||||
@ -390,19 +416,20 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
}
|
||||
}
|
||||
|
||||
// <5> fill header
|
||||
// <5> fill header, encoding_meta and fix cols data
|
||||
if (OB_SUCC(ret)) {
|
||||
get_header(data_buffer_)->row_count_ = static_cast<int16_t>(datum_rows_.count());
|
||||
get_header(data_buffer_)->row_count_ = static_cast<uint32_t>(datum_rows_.count());
|
||||
get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_;
|
||||
get_header(data_buffer_)->all_lob_in_row_ = !has_lob_out_row_;
|
||||
|
||||
|
||||
get_header(data_buffer_)->max_merged_trans_version_ = max_merged_trans_version_;
|
||||
const int64_t header_size = get_header(data_buffer_)->header_size_;
|
||||
char *data = data_buffer_.data() + header_size;
|
||||
FOREACH(e, encoders_) {
|
||||
MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader));
|
||||
data += sizeof(ObColumnHeader);
|
||||
}
|
||||
// fill encoding meta and fix cols data
|
||||
MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -415,7 +442,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
ObIColumnEncoder *e = encoders_.at(idx);
|
||||
pe.type_ = static_cast<ObColumnHeader::Type>(e->get_column_header().type_);
|
||||
if (ObColumnHeader::is_inter_column_encoder(pe.type_)) {
|
||||
pe.ref_col_idx_ = static_cast<ObColumnEqualEncoder *>(e)->get_ref_col_idx();
|
||||
pe.ref_col_idx_ = static_cast<ObSpanColumnEncoder *>(e)->get_ref_col_idx();
|
||||
} else {
|
||||
pe.ref_col_idx_ = 0;
|
||||
}
|
||||
|
@ -53,11 +53,13 @@ public:
|
||||
int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
int64_t need_size = 0;
|
||||
if (!is_inited_) {
|
||||
int64_t encoders_need_size = 0;
|
||||
const int64_t col_header_size = ctx_.column_cnt_ * (sizeof(ObColumnHeader));
|
||||
char *encoding_meta_buf = nullptr;
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
LOG_WARN("not init", K(ret));
|
||||
} else if (datum_rows_.empty()) {
|
||||
} else if (OB_UNLIKELY(datum_rows_.empty())) {
|
||||
ret = OB_INNER_STAT_ERROR;
|
||||
LOG_WARN("empty micro block", K(ret));
|
||||
} else if (OB_FAIL(set_datum_rows_ptr())) {
|
||||
@ -66,11 +68,11 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
LOG_WARN("pivot rows to columns failed", K(ret));
|
||||
} else if (OB_FAIL(row_indexs_.reserve(datum_rows_.count()))) {
|
||||
LOG_WARN("array reserve failed", K(ret), "count", datum_rows_.count());
|
||||
} else if (OB_FAIL(encoder_detection(need_size))) {
|
||||
} else if (OB_FAIL(encoder_detection(encoders_need_size))) {
|
||||
LOG_WARN("detect column encoding failed", K(ret));
|
||||
} else {
|
||||
|
||||
for (int64_t i = 0; i < ctx_.column_cnt_; ++i) {
|
||||
encoders_need_size = 0;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < ctx_.column_cnt_; ++i) {
|
||||
const bool force_var_store = false;
|
||||
if (NULL != encoders_[i]) {
|
||||
free_encoder(encoders_[i]);
|
||||
@ -87,12 +89,36 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
encoders_[i] = e;
|
||||
}
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < encoders_.count(); i++) {
|
||||
int64_t need_size = 0;
|
||||
if (OB_FAIL(encoders_.at(i)->get_encoding_store_meta_need_space(need_size))) {
|
||||
STORAGE_LOG(WARN, "fail to get_encoding_store_meta_need_space", K(ret), K(i), K(encoders_));
|
||||
} else {
|
||||
need_size += encoders_.at(i)->calc_encoding_fix_data_need_space();
|
||||
encoders_need_size += need_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_FAIL(data_buffer_.ensure_space(col_header_size + encoders_need_size))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(data_buffer_));
|
||||
} else if (OB_ISNULL(encoding_meta_buf = static_cast<char *>(encoding_meta_allocator_.alloc(encoders_need_size)))) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
STORAGE_LOG(WARN, "fail to alloc fix header buf", K(ret), K(encoders_need_size));
|
||||
} else {
|
||||
STORAGE_LOG(DEBUG, "[debug] build micro block", K_(estimate_size), K_(header_size), K_(expand_pct),
|
||||
K(datum_rows_.count()), K(ctx_));
|
||||
|
||||
// <1> store encoding metas and fix cols data
|
||||
// <1> store encoding metas and fix cols data in encoding_meta_buffer
|
||||
int64_t encoding_meta_offset = 0;
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(encoding_meta_offset))) {
|
||||
int64_t encoding_meta_size = 0;
|
||||
ObBufferWriter meta_buf_writer(encoding_meta_buf, encoders_need_size, 0);
|
||||
if (OB_FAIL(store_encoding_meta_and_fix_cols(meta_buf_writer, encoding_meta_offset))) {
|
||||
LOG_WARN("failed to store encoding meta and fixed col data", K(ret));
|
||||
} else if (FALSE_IT(encoding_meta_size = meta_buf_writer.length())) {
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(encoding_meta_size))) {
|
||||
STORAGE_LOG(WARN, "failed to write nop", K(ret), K(meta_buf_writer), K(data_buffer_));
|
||||
}
|
||||
|
||||
// <2> set row data store offset
|
||||
@ -101,7 +127,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
if (OB_FAIL(set_row_data_pos(fix_data_size))) {
|
||||
LOG_WARN("set row data position failed", K(ret));
|
||||
} else {
|
||||
get_header(data_buffer_)->var_column_count_ = static_cast<int16_t>(var_data_encoders_.count());
|
||||
get_header(data_buffer_)->var_column_count_ = static_cast<uint16_t>(var_data_encoders_.count());
|
||||
}
|
||||
}
|
||||
|
||||
@ -123,10 +149,12 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
}
|
||||
ObIntegerArrayGenerator gen;
|
||||
const int64_t row_index_size = row_indexs_.count() * get_header(data_buffer_)->row_index_byte_;
|
||||
if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) {
|
||||
if (OB_FAIL(data_buffer_.ensure_space(row_index_size))) {
|
||||
STORAGE_LOG(WARN, "fail to ensure space", K(ret), K(row_index_size), K(data_buffer_));
|
||||
} else if (OB_FAIL(gen.init(data_buffer_.data() + data_buffer_.length(), get_header(data_buffer_)->row_index_byte_))) {
|
||||
LOG_WARN("init integer array generator failed",
|
||||
K(ret), "byte", get_header(data_buffer_)->row_index_byte_);
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(row_index_size, true))) {
|
||||
} else if (OB_FAIL(data_buffer_.write_nop(row_index_size))) {
|
||||
LOG_WARN("advance data buffer failed", K(ret), K(row_index_size));
|
||||
} else {
|
||||
for (int64_t idx = 0; idx < row_indexs_.count(); ++idx) {
|
||||
@ -136,19 +164,20 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
}
|
||||
}
|
||||
|
||||
// <5> fill header
|
||||
// <5> fill header, encoding_meta and fix cols data
|
||||
if (OB_SUCC(ret)) {
|
||||
get_header(data_buffer_)->row_count_ = static_cast<int16_t>(datum_rows_.count());
|
||||
get_header(data_buffer_)->row_count_ = static_cast<uint32_t>(datum_rows_.count());
|
||||
get_header(data_buffer_)->has_string_out_row_ = has_string_out_row_;
|
||||
get_header(data_buffer_)->all_lob_in_row_ = !has_lob_out_row_;
|
||||
|
||||
|
||||
get_header(data_buffer_)->max_merged_trans_version_ = max_merged_trans_version_;
|
||||
const int64_t header_size = get_header(data_buffer_)->header_size_;
|
||||
char *data = data_buffer_.data() + header_size;
|
||||
FOREACH(e, encoders_) {
|
||||
MEMCPY(data, &(*e)->get_column_header(), sizeof(ObColumnHeader));
|
||||
data += sizeof(ObColumnHeader);
|
||||
}
|
||||
// fill encoding meta and fix cols data
|
||||
MEMCPY(data_buffer_.data() + encoding_meta_offset, encoding_meta_buf, encoding_meta_size);
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -161,7 +190,7 @@ int ObMicroBlockRawEncoder::build_block(char *&buf, int64_t &size)
|
||||
ObIColumnEncoder *e = encoders_.at(idx);
|
||||
pe.type_ = static_cast<ObColumnHeader::Type>(e->get_column_header().type_);
|
||||
if (ObColumnHeader::is_inter_column_encoder(pe.type_)) {
|
||||
pe.ref_col_idx_ = static_cast<ObColumnEqualEncoder *>(e)->get_ref_col_idx();
|
||||
pe.ref_col_idx_ = static_cast<ObSpanColumnEncoder *>(e)->get_ref_col_idx();
|
||||
} else {
|
||||
pe.ref_col_idx_ = 0;
|
||||
}
|
||||
@ -214,6 +243,7 @@ public:
|
||||
decode_res_pool_ = new(allocator_.alloc(sizeof(ObDecodeResourcePool))) ObDecodeResourcePool;
|
||||
tenant_ctx_.set(decode_res_pool_);
|
||||
share::ObTenantEnv::set_tenant(&tenant_ctx_);
|
||||
encoder_.encoding_meta_allocator_.set_tenant_id(OB_SERVER_TENANT_ID);
|
||||
encoder_.data_buffer_.allocator_.set_tenant_id(OB_SERVER_TENANT_ID);
|
||||
encoder_.row_buf_holder_.allocator_.set_tenant_id(OB_SERVER_TENANT_ID);
|
||||
decode_res_pool_->init();
|
||||
|
Reference in New Issue
Block a user