Skip Index Bugfix

This commit is contained in:
ND501 2024-11-27 05:45:47 +00:00 committed by ob-robot
parent 1defbff2de
commit c2be6cf322
11 changed files with 427 additions and 71 deletions

View File

@ -30,10 +30,8 @@ ObAggRowWriter::ObAggRowWriter()
agg_datums_(nullptr),
column_count_(0),
col_idx_count_(0),
estimate_data_size_(0),
col_meta_list_(),
header_(),
header_size_(0),
row_helper_()
{}
@ -60,7 +58,7 @@ int ObAggRowWriter::init(const ObIArray<ObSkipIndexColMeta> &agg_col_arr,
} else if (FALSE_IT(agg_datums_ = agg_data.storage_datums_)) {
} else if (OB_FAIL(sort_metas(agg_col_arr, allocator))) {
LOG_WARN("failed to sort agg col metas", K(ret));
} else if (OB_FAIL(calc_estimate_data_size())) {
} else if (OB_FAIL(calc_serialize_agg_buf_size())) {
LOG_WARN("failed to calc estimate data size", K(ret));
} else {
is_inited_ = true;
@ -91,9 +89,11 @@ int ObAggRowWriter::sort_metas(const ObIArray<ObSkipIndexColMeta> &agg_col_arr,
return ret;
}
int ObAggRowWriter::calc_estimate_data_size()
int ObAggRowWriter::calc_serialize_agg_buf_size()
{
int ret = OB_SUCCESS;
int64_t agg_header_size = 0;
int64_t agg_data_size = 0;
header_.pack_ = 0;
header_.agg_col_idx_size_ = 0;
header_.bitmap_size_ = ObAggRowHeader::AGG_COL_TYPE_BITMAP_SIZE;
@ -104,8 +104,7 @@ int ObAggRowWriter::calc_estimate_data_size()
} while(max_col_idx != 0);
col_idx_count_ = 0;
estimate_data_size_ = 0;
header_.agg_col_off_size_ = 1; // default use 1 byte to save offset
header_.cell_off_size_ = 1; // default use 1 byte to save offset
header_.agg_col_idx_off_size_ = 1;
int64_t stored_col_cnt = 0;
for (int64_t i = 0; i < column_count_; /*++i*/) {
@ -128,24 +127,32 @@ int ObAggRowWriter::calc_estimate_data_size()
if (cur_stored_col_cnt > 0) {
++cur_stored_col_cnt; // reserve one more column to save cell size
}
if (header_.agg_col_off_size_ == 1 && cur_cell_size + cur_stored_col_cnt > UINT8_MAX) {
header_.agg_col_off_size_ = 2;
if (header_.cell_off_size_ == 1 && cur_cell_size + cur_stored_col_cnt > UINT8_MAX) {
header_.cell_off_size_ = 2;
}
++col_idx_count_;
estimate_data_size_ += cur_cell_size;
agg_data_size += cur_cell_size;
stored_col_cnt += cur_stored_col_cnt;
i = end; // start next loop
}
estimate_data_size_ += stored_col_cnt * header_.agg_col_off_size_;
if (estimate_data_size_ > UINT8_MAX) {
agg_data_size += stored_col_cnt * header_.cell_off_size_;
agg_header_size = sizeof(ObAggRowHeader) + col_idx_count_ * header_.agg_col_idx_size_
+ col_idx_count_ * header_.agg_col_idx_off_size_;
if (agg_data_size + agg_header_size > UINT8_MAX) {
header_.agg_col_idx_off_size_ = 2;
// We have to update agg header size, because agg_col_idx_off_size has changed.
agg_header_size = sizeof(ObAggRowHeader) + col_idx_count_ * header_.agg_col_idx_size_
+ col_idx_count_ * header_.agg_col_idx_off_size_;
// We don't support larger skip index for now.
if (OB_UNLIKELY(agg_data_size + agg_header_size > UINT16_MAX)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("fail to calculate serialize agg buf size, not support larger skip index",
K(ret), K(header_), K(agg_data_size), K(agg_header_size));
}
}
header_size_ = sizeof(ObAggRowHeader);
header_size_ += col_idx_count_ * header_.agg_col_idx_size_;
header_size_ += col_idx_count_ * header_.agg_col_idx_off_size_;
header_.agg_col_cnt_ = col_idx_count_;
header_.length_ = estimate_data_size_ + header_size_;
header_.length_ = agg_data_size + agg_header_size;
return ret;
}
@ -167,7 +174,7 @@ int ObAggRowWriter::write_cell(
} else if (OB_FAIL(row_helper_.col_bitmap_gen_.init(buf + pos, header_.bitmap_size_))) {
LOG_WARN("failed to init bitmap", K(ret));
} else if (FALSE_IT(pos += header_.bitmap_size_)) {
} else if (OB_FAIL(row_helper_.col_off_gen_.init(buf + pos, header_.agg_col_off_size_))) {
} else if (OB_FAIL(row_helper_.col_off_gen_.init(buf + pos, header_.cell_off_size_))) {
LOG_WARN("failed to init col off arr", K(ret));
} else {
ObIIntegerArray &col_bitmap = row_helper_.col_bitmap_gen_.get_array();
@ -177,7 +184,7 @@ int ObAggRowWriter::write_cell(
if (stored_col_cnt > 0) {
++stored_col_cnt; // reserve one more column to save cell size
}
pos += stored_col_cnt * header_.agg_col_off_size_;
pos += stored_col_cnt * header_.cell_off_size_;
int64_t idx = 0;
int64_t cur = start;
while (OB_SUCC(ret) && idx < stored_col_cnt - 1 && cur < end) {
@ -203,7 +210,7 @@ int ObAggRowWriter::write_cell(
++idx;
}
if (OB_SUCC(ret) && stored_col_cnt > 0) {
LOG_DEBUG("write cell(reserved)", K(idx), K(pos), K(orig_pos), K(header_size_));
LOG_DEBUG("write cell(reserved)", K(idx), K(pos), K(orig_pos), K(header_));
col_off_arr.set(stored_col_cnt - 1, pos - orig_pos); // cell end
}
}
@ -220,10 +227,9 @@ int ObAggRowWriter::write_agg_data(char *buf, const int64_t buf_size, int64_t &p
if (OB_UNLIKELY(!is_inited_)) {
ret = OB_NOT_INIT;
LOG_WARN("not init", K(ret));
} else if (OB_UNLIKELY(buf_size < pos + estimate_data_size_ + header_size_)) {
} else if (OB_UNLIKELY(buf_size < pos + get_serialize_data_size())) {
ret = OB_BUF_NOT_ENOUGH;
LOG_WARN("buf not enough, cannot write data", K(ret), K(buf_size), K(pos),
K_(estimate_data_size), K_(header_size), K_(header));
LOG_WARN("buf not enough, cannot write data", K(ret), K(buf_size), K(pos), K_(header));
} else if (FALSE_IT(pos += sizeof(ObAggRowHeader))) {
} else if (OB_FAIL(row_helper_.col_idx_gen_.init(buf + pos, header_.agg_col_idx_size_))) {
LOG_WARN("failed to init col idx arr", K(ret), K_(header));
@ -261,14 +267,16 @@ int ObAggRowWriter::write_agg_data(char *buf, const int64_t buf_size, int64_t &p
}
}
if (OB_SUCC(ret)) {
header_.length_ = pos - orig_pos;
if (OB_UNLIKELY(!header_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid agg row header", K(ret), K_(header));
} else {
*header = header_;
}
if (OB_FAIL(ret)) {
// do nothing.
} else if (OB_UNLIKELY(!header_.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid agg row header", K(ret), K(header_), K(pos), K(orig_pos));
} else if (OB_UNLIKELY(pos - orig_pos != get_serialize_data_size())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("fail to write agg data, unexpected data size", K(ret), K(header_), K(pos), K(orig_pos));
} else {
*header = header_;
}
}
return ret;
@ -417,11 +425,11 @@ int ObAggRowReader::read_cell(
} else if (FALSE_IT(bit_val = row_helper_.col_bitmap_gen_.get_array().at(0))) {
} else if (!(bit_val & tar_mask)) {
found = false;
} else if (OB_UNLIKELY(buf_size < header_->bitmap_size_ + header_->agg_col_off_size_)) {
} else if (OB_UNLIKELY(buf_size < header_->bitmap_size_ + header_->cell_off_size_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected buf size when bitmap matches", K(ret), K(buf_size), KPC_(header));
} else if (OB_FAIL(row_helper_.col_off_gen_.init(
cell_buf + header_->bitmap_size_, header_->agg_col_off_size_))) {
cell_buf + header_->bitmap_size_, header_->cell_off_size_))) {
LOG_WARN("failed to init col off gen", K(ret));
} else {
found = true;

View File

@ -1,12 +1,14 @@
// Copyright (c) 2021 Ant Group CO., Ltd.
// OceanBase is licensed under Mulan PubL v1.
// You can use this software according to the terms and conditions of the Mulan PubL v1.
// You may obtain a copy of Mulan PubL v1 at:
// http://license.coscl.org.cn/MulanPubL-1.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v1 for more details.
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#ifndef OCEANBASE_STORAGE_BLOCKSSTABLE_OB_AGG_ROW_STRUCT_H
#define OCEANBASE_STORAGE_BLOCKSSTABLE_OB_AGG_ROW_STRUCT_H
@ -31,10 +33,13 @@ public:
public:
ObAggRowHeader();
~ObAggRowHeader() = default;
bool is_valid() const { return version_ == AGG_ROW_HEADER_VERSION && agg_col_cnt_ > 0
&& agg_col_idx_size_ > 0 && agg_col_idx_off_size_ > 0 && bitmap_size_ == AGG_COL_TYPE_BITMAP_SIZE; }
bool is_valid() const
{
return version_ == AGG_ROW_HEADER_VERSION && agg_col_cnt_ > 0 && agg_col_idx_size_ > 0 && agg_col_idx_off_size_ > 0
&& bitmap_size_ == AGG_COL_TYPE_BITMAP_SIZE;
}
TO_STRING_KV(K_(version), K_(length), K_(agg_col_cnt), K_(agg_col_idx_size),
K_(agg_col_idx_off_size), K_(agg_col_off_size), K_(bitmap_size));
K_(agg_col_idx_off_size), K_(cell_off_size), K_(bitmap_size));
public:
int16_t version_;
int16_t length_;
@ -46,7 +51,7 @@ public:
{
uint16_t agg_col_idx_size_ : 6;
uint16_t agg_col_idx_off_size_ : 3;
uint16_t agg_col_off_size_ : 3;
uint16_t cell_off_size_ : 3;
uint16_t bitmap_size_ : 4;
};
};
@ -69,12 +74,12 @@ public:
int init(const ObIArray<ObSkipIndexColMeta> &agg_col_arr,
const ObDatumRow &agg_data,
ObIAllocator &allocator);
int64_t get_data_size() { return estimate_data_size_ + header_size_; }
OB_INLINE int64_t get_serialize_data_size() const { return header_.length_; }
int write_agg_data(char *buf, const int64_t buf_size, int64_t &pos);
void reset();
private:
int sort_metas(const ObIArray<ObSkipIndexColMeta> &agg_col_arr, ObIAllocator &allocator);
int calc_estimate_data_size();
int calc_serialize_agg_buf_size();
int write_cell(
int64_t start,
int64_t end,
@ -91,10 +96,8 @@ private:
const ObStorageDatum *agg_datums_;
int64_t column_count_;
int64_t col_idx_count_;
int64_t estimate_data_size_;
ColMetaList col_meta_list_;
ObAggRowHeader header_;
int64_t header_size_;
ObAggRowHelper row_helper_;
DISALLOW_COPY_AND_ASSIGN(ObAggRowWriter);
};

View File

@ -921,7 +921,14 @@ void ObIndexRowAggInfo::reset()
/* ------------------------------------ObIndexBlockAggregator-------------------------------------*/
ObIndexBlockAggregator::ObIndexBlockAggregator()
: skip_index_aggregator_(), aggregated_row_(), aggregate_info_(), need_data_aggregate_(false), is_inited_(false) {}
: skip_index_aggregator_(),
aggregated_row_(),
aggregate_info_(),
need_data_aggregate_(false),
has_reused_null_agg_in_this_micro_block_(false),
is_inited_(false)
{
}
void ObIndexBlockAggregator::reset()
{
@ -929,6 +936,7 @@ void ObIndexBlockAggregator::reset()
aggregated_row_.reset();
aggregate_info_.reset();
need_data_aggregate_ = false;
has_reused_null_agg_in_this_micro_block_ = false;
is_inited_ = false;
}
@ -936,6 +944,7 @@ void ObIndexBlockAggregator::reuse()
{
skip_index_aggregator_.reuse();
aggregate_info_.reset();
has_reused_null_agg_in_this_micro_block_ = false;
}
int ObIndexBlockAggregator::init(const ObDataStoreDesc &store_desc, ObIAllocator &allocator)
@ -974,10 +983,10 @@ int ObIndexBlockAggregator::eval(const ObIndexBlockRowDesc &row_desc)
} else if (OB_UNLIKELY(!row_desc.is_valid())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid index block row descriptor", K(ret));
} else if (need_data_aggregate_) {
} else if (need_data_aggregate()) {
if (OB_ISNULL(row_desc.aggregated_row_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null pointer for aggregated row", K(ret), K(row_desc));
// There is data that does not contain aggregate row, so we disable skip index aggregate, do nothing here.
has_reused_null_agg_in_this_micro_block_ = true;
} else if (row_desc.is_serialized_agg_row_) {
const ObAggRowHeader *agg_row_header = reinterpret_cast<const ObAggRowHeader *>(
row_desc.serialized_agg_row_buf_);
@ -1005,8 +1014,8 @@ int ObIndexBlockAggregator::get_index_agg_result(ObIndexBlockRowDesc &row_desc)
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not inited", K(ret));
} else if (need_data_aggregate_
&& OB_FAIL(skip_index_aggregator_.get_aggregated_row(row_desc.aggregated_row_))) {
} else if (need_data_aggregate()
&& OB_FAIL(skip_index_aggregator_.get_aggregated_row(row_desc.aggregated_row_))) {
LOG_WARN("Fail to get aggregated row", K(ret));
} else {
aggregate_info_.get_agg_result(row_desc);
@ -1023,7 +1032,7 @@ int ObIndexBlockAggregator::get_index_row_agg_info(ObIndexRowAggInfo &index_row_
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not inited", K(ret));
} else if (need_data_aggregate_) {
} else if (need_data_aggregate()) {
if (OB_FAIL(skip_index_aggregator_.get_aggregated_row(agg_row))) {
LOG_WARN("Fail to get aggregated row", K(ret));
} else if (OB_FAIL(index_row_agg_info.aggregated_row_.init(allocator, aggregated_row_.get_column_count()))) {

View File

@ -232,7 +232,7 @@ public:
int eval(const ObIndexBlockRowDesc &row_desc);
int get_index_agg_result(ObIndexBlockRowDesc &row_desc);
int get_index_row_agg_info(ObIndexRowAggInfo &index_row_agg_info, ObIAllocator &allocator);
inline bool need_data_aggregate() const { return need_data_aggregate_; };
inline bool need_data_aggregate() const { return need_data_aggregate_ && !has_reused_null_agg_in_this_micro_block_; };
inline const ObDatumRow& get_aggregated_row() const { return aggregated_row_; };
inline int64_t get_max_agg_size() { return skip_index_aggregator_.get_max_agg_size(); }
inline int64_t get_row_count() const { return aggregate_info_.row_count_; }
@ -240,12 +240,13 @@ public:
inline bool is_last_row_last_flag() const { return aggregate_info_.is_last_row_last_flag_; }
inline int64_t get_max_merged_trans_version() const { return aggregate_info_.max_merged_trans_version_; }
TO_STRING_KV(K_(skip_index_aggregator), K_(aggregated_row), K_(aggregate_info),
K_(need_data_aggregate), K_(is_inited));
K_(need_data_aggregate), K_(has_reused_null_agg_in_this_micro_block), K_(is_inited));
private:
ObSkipIndexAggregator skip_index_aggregator_;
ObDatumRow aggregated_row_;
ObAggregateInfo aggregate_info_;
bool need_data_aggregate_;
bool has_reused_null_agg_in_this_micro_block_;
bool is_inited_;
};

View File

@ -994,6 +994,7 @@ int ObSSTableIndexBuilder::merge_index_tree_from_meta_block(ObSSTableMergeRes &r
while (OB_SUCC(ret)) {
int64_t absolute_row_offset = -1;
meta_row.reuse();
macro_meta.reset();
if (OB_FAIL(index_block_loader_.get_next_row(meta_row))) {
if (OB_UNLIKELY(ret != OB_ITER_END)) {
STORAGE_LOG(WARN, "fail to get row", K(ret),
@ -2417,8 +2418,14 @@ int ObBaseIndexBlockBuilder::meta_to_row_desc(
row_desc.macro_block_count_ = 1;
row_desc.has_string_out_row_ = macro_meta.val_.has_string_out_row_;
row_desc.has_lob_out_row_ = !macro_meta.val_.all_lob_in_row_;
row_desc.serialized_agg_row_buf_ = macro_meta.val_.agg_row_buf_;
row_desc.is_serialized_agg_row_ = true;
// We have validate macro meta in caller, so we do not validate agg_row_buf and agg_row_len here.
if (nullptr != macro_meta.val_.agg_row_buf_) {
row_desc.serialized_agg_row_buf_ = macro_meta.val_.agg_row_buf_;
row_desc.is_serialized_agg_row_ = true;
} else {
row_desc.serialized_agg_row_buf_ = nullptr;
row_desc.is_serialized_agg_row_ = false;
}
// is_last_row_last_flag_ only used in data macro block
}
return ret;
@ -2458,7 +2465,7 @@ int ObBaseIndexBlockBuilder::row_desc_to_meta(
data_store_desc_->get_agg_meta_array(),
*macro_row_desc.aggregated_row_, allocator))) {
STORAGE_LOG(WARN, "Fail to init aggregate row writer", K(ret));
} else if (FALSE_IT(agg_row_upper_size = agg_row_writer_.get_data_size())) {
} else if (FALSE_IT(agg_row_upper_size = agg_row_writer_.get_serialize_data_size())) {
} else if (OB_ISNULL(agg_row_buf = static_cast<char *>(
allocator.alloc(agg_row_upper_size)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;

View File

@ -82,7 +82,7 @@ int ObIndexBlockRowDesc::init(const ObDataStoreDesc &data_store_desc,
has_string_out_row_ = index_row_header->has_string_out_row_;
has_lob_out_row_ = !index_row_header->all_lob_in_row_;
row_offset_ = idx_row_parser.get_row_offset();
is_serialized_agg_row_ = true;
is_serialized_agg_row_ = false;
const char *agg_row_buf = nullptr;
int64_t agg_buf_size = 0;
@ -99,6 +99,7 @@ int ObIndexBlockRowDesc::init(const ObDataStoreDesc &data_store_desc,
STORAGE_LOG(WARN, "Fail to get aggregate", K(ret));
} else {
serialized_agg_row_buf_ = agg_row_buf;
is_serialized_agg_row_ = true;
}
}
return ret;
@ -323,8 +324,7 @@ int ObIndexBlockRowBuilder::calc_data_size(
size += agg_header->length_;
}
} else {
// agg_writer.get_data_size() is larger than or equal to real serialized size
size += agg_writer.get_data_size();
size += agg_writer.get_serialize_data_size();
}
}
} else {

View File

@ -157,7 +157,8 @@ return (DATA_BLOCK_META_VAL_VERSION == version_ || DATA_BLOCK_META_VAL_VERSION_V
&& row_store_type_ < ObRowStoreType::MAX_ROW_STORE
&& logic_id_.is_valid()
&& macro_id_.is_valid()
&& (0 == agg_row_len_ || nullptr != agg_row_buf_)
&& agg_row_len_ >= 0
&& ((0 == agg_row_len_ && nullptr == agg_row_buf_) || (0 < agg_row_len_ && nullptr != agg_row_buf_))
&& (ddl_end_row_offset_ == -1 || (version_ >= DATA_BLOCK_META_VAL_VERSION_V2 && ddl_end_row_offset_ >= 0));
}
@ -353,7 +354,9 @@ DEFINE_DESERIALIZE(ObDataBlockMetaVal)
is_last_row_last_flag_,
agg_row_len_);
if (OB_SUCC(ret)) {
if (agg_row_len_ > 0) {
if (agg_row_len_ == 0) {
agg_row_buf_ = nullptr;
} else if (agg_row_len_ > 0) {
agg_row_buf_ = buf + pos;
pos += agg_row_len_;
}

View File

@ -8,13 +8,23 @@
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#define USING_LOG_PREFIX STORAGE
#include <errno.h>
#include <gtest/gtest.h>
#include <stdlib.h>
#include <utility>
#include <string>
#include <cstring>
#include <sstream>
#include <iostream>
#define protected public
#define OK(ass) ASSERT_EQ(OB_SUCCESS, (ass))
#define private public
#include "lib/ob_errno.h"
#include "storage/blocksstable/index_block/ob_agg_row_struct.h"
#include "storage/blocksstable/index_block/ob_index_block_util.h"
#include "storage/blocksstable/index_block/ob_index_block_aggregator.h"
namespace oceanbase
{
@ -34,6 +44,14 @@ public:
virtual ~TestAggRow();
virtual void SetUp();
virtual void TearDown();
void prepare_agg_row_writer(ObAggRowWriter &row_writer);
void set_datum(ObStorageDatum &datum, const std::string &hexString, const int64_t len, const uint32_t pack);
void prepare_skip_index_col_metas(ObIArray<oceanbase::blocksstable::ObSkipIndexColMeta> &metas);
int check_serialized_aggregated_row(const ObIArray<oceanbase::blocksstable::ObSkipIndexColMeta> &full_agg_metas,
const char *agg_row_buf,
const int64_t agg_row_len,
const int64_t row_count);
protected:
ObArenaAllocator allocator_;
};
@ -54,6 +72,313 @@ void TestAggRow::TearDown()
{
}
int TestAggRow::check_serialized_aggregated_row(
const ObIArray<oceanbase::blocksstable::ObSkipIndexColMeta> &full_agg_metas,
const char *agg_row_buf,
const int64_t agg_row_len,
const int64_t row_count)
{
int ret = OB_SUCCESS;
ObStorageDatum tmp_datum;
ObStorageDatum tmp_null_datum;
ObAggRowReader agg_reader;
agg_reader.reset();
uint64_t agg_crc = ob_crc64(agg_row_buf, agg_row_len);
if (OB_FAIL(ret)) {
} else if (OB_FAIL(agg_reader.init(agg_row_buf, agg_row_len))) {
LOG_WARN("Fail to init agg row reader", K(ret), KP(agg_row_buf), K(agg_row_len));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < full_agg_metas.count(); ++i) {
tmp_datum.reuse();
tmp_null_datum.reuse();
const ObSkipIndexColMeta &idx_col_meta = full_agg_metas.at(i);
if (OB_FAIL(agg_reader.read(idx_col_meta, tmp_datum))) {
LOG_WARN("Fail to read aggregated data", K(ret), K(idx_col_meta));
} else if (OB_UNLIKELY(tmp_datum.is_ext())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected ext agg datum", K(ret), K(tmp_datum), K(idx_col_meta));
} else if (tmp_datum.is_null()) {
ObSkipIndexColMeta null_col_meta(idx_col_meta.col_idx_, SK_IDX_NULL_COUNT);
if (OB_FAIL(agg_reader.read(null_col_meta, tmp_null_datum))) {
LOG_WARN("Fail to read aggregated null", K(ret), K(idx_col_meta));
} else if (tmp_null_datum.is_ext()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null count datum", K(ret), K(tmp_null_datum), K(idx_col_meta));
} else if (tmp_null_datum.is_null()) {
// do nothing.
} else if (tmp_null_datum.get_int() > row_count) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("Unexpected null count datum out row count", K(ret),
K(tmp_null_datum), K(row_count), K(null_col_meta), K(tmp_null_datum.get_int()));
ob_abort();
} else if (tmp_null_datum.get_int() < row_count) {
// do nothing.
}
}
}
}
return ret;
}
void TestAggRow::set_datum(ObStorageDatum &datum, const std::string& hexString, const int64_t len, const uint32_t pack)
{
int ret = OB_SUCCESS;
char * buffer = static_cast<char *>(allocator_.alloc(40));
ASSERT_NE(nullptr, buffer);
std::istringstream iss(hexString);
std::string hexByte;
size_t i = 0;
// split the hex string with spaces
if (len > 0) {
while (iss >> hexByte) {
if (i >= len) {
std::cerr << "Buffer size is smaller than the number of hex values." << std::endl;
return;
}
// read hexadecimal string, convert to int, and write into buffer
unsigned int byteValue;
std::stringstream ss;
ss << std::hex << hexByte;
ss >> byteValue;
buffer[i++] = static_cast<char>(byteValue);
}
}
// set ptr
const char * ptr = buffer;
datum.ptr_ = ptr;
// set desc
datum.pack_ = pack;
}
void TestAggRow::prepare_skip_index_col_metas(ObIArray<oceanbase::blocksstable::ObSkipIndexColMeta> & metas)
{
int ret = OB_SUCCESS;
ret = metas.reserve(21);
ASSERT_EQ(OB_SUCCESS, ret);
// col 0
ret = metas.push_back(ObSkipIndexColMeta(0, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(0, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(0, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 1
ret = metas.push_back(ObSkipIndexColMeta(1, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(1, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(1, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 2
ret = metas.push_back(ObSkipIndexColMeta(2, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(2, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(2, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 3
ret = metas.push_back(ObSkipIndexColMeta(3, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(3, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(3, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 4
ret = metas.push_back(ObSkipIndexColMeta(4, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(4, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(4, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 7
ret = metas.push_back(ObSkipIndexColMeta(7, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(7, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(7, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
// col 8
ret = metas.push_back(ObSkipIndexColMeta(8, ObSkipIndexColType(0)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(8, ObSkipIndexColType(1)));
ASSERT_EQ(OB_SUCCESS, ret);
ret = metas.push_back(ObSkipIndexColMeta(8, ObSkipIndexColType(2)));
ASSERT_EQ(OB_SUCCESS, ret);
}
void TestAggRow::prepare_agg_row_writer(ObAggRowWriter & row_writer)
{
int ret = OB_SUCCESS;
/* ---------------------------------- col_meta_list ---------------------------------- */
row_writer.col_meta_list_.set_allocator(&allocator_);
ret = row_writer.col_meta_list_.reserve(21);
// col 0
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(0, ObSkipIndexColType(0)), 0));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(0, ObSkipIndexColType(1)), 1));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(0, ObSkipIndexColType(2)), 2));
ASSERT_EQ(OB_SUCCESS, ret);
// col 1
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(1, ObSkipIndexColType(0)), 3));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(1, ObSkipIndexColType(1)), 4));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(1, ObSkipIndexColType(2)), 5));
ASSERT_EQ(OB_SUCCESS, ret);
// col 2
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(2, ObSkipIndexColType(0)), 6));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(2, ObSkipIndexColType(1)), 7));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(2, ObSkipIndexColType(2)), 8));
ASSERT_EQ(OB_SUCCESS, ret);
// col 3
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(3, ObSkipIndexColType(0)), 9));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(3, ObSkipIndexColType(1)), 10));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(3, ObSkipIndexColType(2)), 11));
ASSERT_EQ(OB_SUCCESS, ret);
// col 4
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(4, ObSkipIndexColType(0)), 12));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(4, ObSkipIndexColType(1)), 13));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(4, ObSkipIndexColType(2)), 14));
ASSERT_EQ(OB_SUCCESS, ret);
// col 7
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(7, ObSkipIndexColType(0)), 15));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(7, ObSkipIndexColType(1)), 16));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(7, ObSkipIndexColType(2)), 17));
ASSERT_EQ(OB_SUCCESS, ret);
// col 8
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(8, ObSkipIndexColType(0)), 18));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(8, ObSkipIndexColType(1)), 19));
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_writer.col_meta_list_.push_back(std::make_pair(ObSkipIndexColMeta(8, ObSkipIndexColType(2)), 20));
ASSERT_EQ(OB_SUCCESS, ret);
/* ---------------------------------- agg_datums ---------------------------------- */
ObStorageDatum * storage_datums = static_cast<ObStorageDatum *>(allocator_.alloc(sizeof(ObStorageDatum) * 21));
ASSERT_NE(nullptr, storage_datums);
// col 0
set_datum(storage_datums[0], "0xf2 0xcc 0xe9 0xce 0xff 0xff 0xff 0xff", 8, 8);
set_datum(storage_datums[1], "0xf2 0xcc 0xe9 0xce 0xff 0xff 0xff 0xff", 8, 8);
set_datum(storage_datums[2], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 1
set_datum(storage_datums[3], "0x73 0x74 0x72 0x75 0x67 0x67 0x6c 0x65 0x73 0x20 0x61 0x72 0x6d 0x65 0x72 0x20 0x63 0x61 0x77 0x73 0x20 0x61 0x70 0x70 0x6c 0x79 0x20", 27, 27);
set_datum(storage_datums[4], "0x73 0x74 0x72 0x75 0x67 0x67 0x6c 0x65 0x73 0x20 0x61 0x72 0x6d 0x65 0x72 0x20 0x63 0x61 0x77 0x73 0x20 0x61 0x70 0x70 0x6c 0x79 0x20", 27, 27);
set_datum(storage_datums[5], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 2
set_datum(storage_datums[6], "0xc5 0xbe 0xdb 0x19 0x00 0x00 0x00 0x00", 8, 8);
set_datum(storage_datums[7], "0xc5 0xbe 0xdb 0x19 0x00 0x00 0x00 0x00", 8, 8);
set_datum(storage_datums[8], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 3
set_datum(storage_datums[9], "0x63 0x6f 0x6d 0x70 0x72 0x65 0x68 0x65 0x6e 0x64 0x69 0x6e 0x67 0x20 0x64 0x72 0x65 0x73 0x73 0x6d 0x61 0x6b 0x65 0x72 0x27 0x73 0x20 0x65 0x6c", 29, 29);
set_datum(storage_datums[10], "0x63 0x6f 0x6d 0x70 0x72 0x65 0x68 0x65 0x6e 0x64 0x69 0x6e 0x67 0x20 0x64 0x72 0x65 0x73 0x73 0x6d 0x61 0x6b 0x65 0x72 0x27 0x73 0x20 0x65 0x6c", 29, 29);
set_datum(storage_datums[11], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 4
set_datum(storage_datums[12], "0x80 0xac 0x3c 0x2e 0x72 0x00 0x00 0x00", 8, 8);
set_datum(storage_datums[13], "0x80 0xac 0x3c 0x2e 0x72 0x00 0x00 0x00", 8, 8);
set_datum(storage_datums[14], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 7
set_datum(storage_datums[15], "0x19 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x03 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 16, 1073741840);
set_datum(storage_datums[16], "0x19 0x07 0x00 0x00 0x00 0x00 0x00 0x00 0x03 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 16, 1073741840);
set_datum(storage_datums[17], "0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
// col 8
set_datum(storage_datums[18], "", 0, 2147483648);
set_datum(storage_datums[19], "", 0, 2147483648);
set_datum(storage_datums[20], "0x01 0x00 0x00 0x00 0x00 0x00 0x00 0x00", 8, 8);
const ObStorageDatum * const_datums = storage_datums;
row_writer.agg_datums_ = const_datums;
/* ---------------------------------- others ---------------------------------- */
row_writer.column_count_ = 21;
row_writer.col_meta_list_.set_allocator(&allocator_);
ret = row_writer.calc_serialize_agg_buf_size();
ASSERT_EQ(OB_SUCCESS, ret);
row_writer.is_inited_ = true;
}
TEST_F(TestAggRow, test_agg_row_serialize_arm)
{
int ret = OB_SUCCESS;
ObAggRowWriter row_writer;
prepare_agg_row_writer(row_writer);
int64_t estimate_size = row_writer.get_serialize_data_size();
char * buf = static_cast<char *>(allocator_.alloc(estimate_size));
ASSERT_NE(nullptr, buf);
MEMSET(buf, 0, estimate_size);
int64_t write_pos = 0;
ret = row_writer.write_agg_data(buf, estimate_size, write_pos);
ASSERT_EQ(OB_SUCCESS, ret);
common::ObFixedArray<oceanbase::blocksstable::ObSkipIndexColMeta, common::ObIAllocator> metas;
metas.set_allocator(&allocator_);
prepare_skip_index_col_metas(metas);
ObAggRowReader row_reader;
ret = row_reader.init(buf, write_pos);
ASSERT_EQ(OB_SUCCESS, ret);
ObStorageDatum * read_datums = static_cast<ObStorageDatum *>(allocator_.alloc(sizeof(ObStorageDatum) * 21));
ASSERT_NE(nullptr, read_datums);
ret = row_reader.read(metas[0], read_datums[0]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[1], read_datums[1]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[2], read_datums[2]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[3], read_datums[3]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[4], read_datums[4]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[5], read_datums[5]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[6], read_datums[6]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[7], read_datums[7]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[8], read_datums[8]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[9], read_datums[9]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[10], read_datums[10]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[11], read_datums[11]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[12], read_datums[12]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[13], read_datums[13]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[14], read_datums[14]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[15], read_datums[15]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[16], read_datums[16]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[17], read_datums[17]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[18], read_datums[18]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[19], read_datums[19]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = row_reader.read(metas[20], read_datums[20]);
ASSERT_EQ(OB_SUCCESS, ret);
ret = check_serialized_aggregated_row(metas, buf, write_pos, 1);
ASSERT_EQ(OB_SUCCESS, ret);
}
TEST_F(TestAggRow, test_agg_row)
{
int ret = OB_SUCCESS;
@ -87,7 +412,7 @@ TEST_F(TestAggRow, test_agg_row)
ObAggRowWriter row_writer;
OK(row_writer.init(agg_cols, agg_row, allocator_));
int64_t buf_size = row_writer.get_data_size();
int64_t buf_size = row_writer.get_serialize_data_size();
char *buf = reinterpret_cast<char *>(allocator_.alloc(buf_size));
ASSERT_NE(nullptr, buf);
MEMSET(buf, 0, buf_size);
@ -110,8 +435,8 @@ TEST_F(TestAggRow, test_agg_row)
row_reader.reset();
}
}//end namespace unittest
}//end namespace oceanbase
} // end namespace unittest
} // end namespace oceanbase
int main(int argc, char **argv)
{

View File

@ -343,7 +343,7 @@ void TestIndexBlockAggregator::serialize_agg_row(
int64_t size = 0;
int64_t pos = 0;
ASSERT_EQ(OB_SUCCESS, agg_row_writer_.init(full_agg_metas_, agg_row, allocator_));
size = agg_row_writer_.get_data_size();
size = agg_row_writer_.get_serialize_data_size();
buf = static_cast<char *>(allocator_.alloc(size));
ASSERT_TRUE(nullptr != buf);
ASSERT_EQ(OB_SUCCESS, agg_row_writer_.write_agg_data(buf, size, pos));

View File

@ -365,7 +365,7 @@ int TestSkipIndexFilter::test_skip_index_filter_pushdown (
ObAggRowWriter row_writer;
row_writer.init(agg_cols, agg_row, allocator_);
int64_t buf_size = row_writer.get_data_size();
int64_t buf_size = row_writer.get_serialize_data_size();
char *buf = reinterpret_cast<char *>(allocator_.alloc(buf_size));
EXPECT_TRUE(buf != nullptr);
MEMSET(buf, 0, buf_size);

View File

@ -355,7 +355,7 @@ void TestSSTableIndexFilter::init_micro_index_info(
ObAggRowWriter row_writer;
row_writer.init(agg_cols, agg_row, allocator_);
int64_t buf_size = row_writer.get_data_size();
int64_t buf_size = row_writer.get_serialize_data_size();
char *buf = reinterpret_cast<char *>(allocator_.alloc(buf_size));
EXPECT_TRUE(buf != nullptr);
MEMSET(buf, 0, buf_size);