[SKIP INDEX] adapt index block aggregate data is null.

This commit is contained in:
obdev
2024-02-06 17:46:29 +00:00
committed by ob-robot
parent 33e94551f4
commit 673fe94c68
4 changed files with 135 additions and 75 deletions

View File

@ -21,6 +21,35 @@ using namespace common;
namespace blocksstable
{
int ObIColAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(result_)) {
ret = OB_INIT_TWICE;
LOG_WARN("Init twice", K(ret));
} else {
col_desc_ = col_desc;
result_ = &result;
result_->set_null();
if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) {
set_not_aggregate();
}
}
return ret;
}
void ObIColAggregator::reuse()
{
if (nullptr != result_) {
result_->set_null();
}
if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) {
set_not_aggregate();
} else {
can_aggregate_ = true;
}
}
int ObIColAggregator::copy_agg_datum(const ObDatum &src, ObDatum &dst)
{
int ret = OB_SUCCESS;
@ -40,28 +69,18 @@ int ObIColAggregator::copy_agg_datum(const ObDatum &src, ObDatum &dst)
int ObColNullCountAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(result_)) {
ret = OB_INIT_TWICE;
LOG_WARN("Init twice", K(ret));
if (OB_FAIL(ObIColAggregator::init(col_desc, result))) {
LOG_WARN("fail to init ObIColAggregator", K(ret));
} else {
null_count_ = 0;
result_ = &result;
if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) {
set_not_aggregate();
}
col_desc_ = col_desc;
}
return ret;
}
void ObColNullCountAggregator::reuse()
{
ObIColAggregator::reuse();
null_count_ = 0;
if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) {
set_not_aggregate();
} else {
can_aggregate_ = true;
}
}
int ObColNullCountAggregator::eval(const ObStorageDatum &datum, const bool is_data)
@ -103,19 +122,12 @@ int ObColNullCountAggregator::get_result(const ObStorageDatum *&result)
int ObColMaxAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(result_)) {
ret = OB_INIT_TWICE;
LOG_WARN("Init twice", K(ret));
if (OB_FAIL(ObIColAggregator::init(col_desc, result))) {
LOG_WARN("fail to init ObIColAggregator", K(ret));
} else {
sql::ObExprBasicFuncs *basic_funcs = ObDatumFuncs::get_basic_func(
col_desc.col_type_.get_type(), col_desc.col_type_.get_collation_type());
cmp_func_ = basic_funcs->null_first_cmp_;
result_ = &result;
result_->set_null();
if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) {
set_not_aggregate();
}
col_desc_ = col_desc;
LOG_DEBUG("[SKIP INDEX] init max aggregator", K(col_desc_), K(can_aggregate_));
}
return ret;
@ -123,14 +135,7 @@ int ObColMaxAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
void ObColMaxAggregator::reuse()
{
if (nullptr != result_) {
result_->set_null();
}
if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) {
set_not_aggregate();
} else {
can_aggregate_ = true;
}
ObIColAggregator::reuse();
}
int ObColMaxAggregator::eval(const ObStorageDatum &datum, const bool is_data)
@ -174,19 +179,12 @@ int ObColMaxAggregator::get_result(const ObStorageDatum *&result)
int ObColMinAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
{
int ret = OB_SUCCESS;
if (OB_NOT_NULL(result_)) {
ret = OB_INIT_TWICE;
LOG_WARN("Init twice", K(ret));
if (OB_FAIL(ObIColAggregator::init(col_desc, result))) {
LOG_WARN("fail to init ObIColAggregator", K(ret));
} else {
sql::ObExprBasicFuncs *basic_funcs = ObDatumFuncs::get_basic_func(
col_desc.col_type_.get_type(), col_desc.col_type_.get_collation_type());
cmp_func_ = basic_funcs->null_last_cmp_;
result_ = &result;
result_->set_null();
if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) {
set_not_aggregate();
}
col_desc_ = col_desc;
LOG_DEBUG("[SKIP INDEX] init min aggregator", K(col_desc_), K(can_aggregate_));
}
return ret;
@ -194,14 +192,7 @@ int ObColMinAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result)
void ObColMinAggregator::reuse()
{
if (nullptr != result_) {
result_->set_null();
}
if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) {
set_not_aggregate();
} else {
can_aggregate_ = true;
}
ObIColAggregator::reuse();
}
int ObColMinAggregator::eval(const ObStorageDatum &datum, const bool is_data)
@ -352,10 +343,11 @@ int ObSkipIndexAggregator::eval(const ObDatumRow &datum_row)
return ret;
}
int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size)
int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size, const int64_t row_count)
{
int ret = OB_SUCCESS;
ObStorageDatum tmp_datum;
ObStorageDatum tmp_null_datum;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
LOG_WARN("Not init", K(ret));
@ -368,13 +360,32 @@ int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size)
evaluated_ = true;
for (int64_t i = 0; OB_SUCC(ret) && i < full_agg_metas_->count(); ++i) {
tmp_datum.reuse();
tmp_null_datum.reuse();
const ObSkipIndexColMeta &idx_col_meta = full_agg_metas_->at(i);
if (OB_FAIL(agg_row_reader_.read(idx_col_meta, tmp_datum))) {
LOG_WARN("Fail to read aggregated data", K(ret));
LOG_WARN("Fail to read aggregated data", K(ret), K(idx_col_meta));
} else if (OB_UNLIKELY(tmp_datum.is_ext())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected ext agg datum", K(ret), K(tmp_datum));
} else if (OB_FAIL(col_aggs_.at(i)->eval(tmp_datum, false))) {
LOG_WARN("Unexpected ext agg datum", K(ret), K(tmp_datum), K(idx_col_meta));
} else if (tmp_datum.is_null()) {
ObSkipIndexColMeta null_col_meta(idx_col_meta.col_idx_, SK_IDX_NULL_COUNT);
if (OB_FAIL(agg_row_reader_.read(null_col_meta, tmp_null_datum))) {
LOG_WARN("Fail to read aggregated null", K(ret), K(idx_col_meta));
} else if (tmp_null_datum.is_ext()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("Unexpected null count datum", K(ret), K(tmp_null_datum), K(idx_col_meta));
} else if (tmp_null_datum.is_null()) {
col_aggs_.at(i)->set_not_aggregate();
} else if (tmp_null_datum.get_int() > row_count) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("Unexpected null count datum out row count", K(ret),
K(tmp_null_datum), K(row_count), K(null_col_meta));
} else if (tmp_null_datum.get_int() < row_count) {
col_aggs_.at(i)->set_not_aggregate();
}
}
if (FAILEDx(col_aggs_.at(i)->eval(tmp_datum, false))) {
col_aggs_.at(i)->set_not_aggregate();
LOG_ERROR("Fail to eval aggregate column", K(ret), K(tmp_datum), K_(is_data),
K(idx_col_meta), K(i), K(col_aggs_.at(i)->get_col_decs()));
@ -659,7 +670,7 @@ int ObIndexBlockAggregator::eval(const ObIndexBlockRowDesc &row_desc)
ret = OB_INVALID_ARGUMENT;
LOG_WARN("Invalid aggregated row header", K(ret), KPC(agg_row_header));
} else if (OB_FAIL(skip_index_aggregator_.eval(
row_desc.serialized_agg_row_buf_, agg_row_header->length_))) {
row_desc.serialized_agg_row_buf_, agg_row_header->length_, row_desc.row_count_))) {
LOG_WARN("Fail to aggregate serialized index row", K(ret), K(row_desc), KPC(agg_row_header));
}
} else if (OB_FAIL(skip_index_aggregator_.eval(*row_desc.aggregated_row_))) {

View File

@ -24,12 +24,12 @@ namespace blocksstable
class ObIColAggregator
{
public:
ObIColAggregator() : col_desc_(), can_aggregate_(true) {}
ObIColAggregator() : col_desc_(), result_(nullptr), can_aggregate_(true) {}
virtual ~ObIColAggregator() {}
virtual int init(const ObColDesc &col_desc, ObStorageDatum &result) = 0;
virtual void reset() = 0;
virtual void reuse() = 0;
virtual void reuse();
virtual int eval(const ObStorageDatum &datum, const bool is_data) = 0;
virtual int get_result(const ObStorageDatum *&result) = 0;
VIRTUAL_TO_STRING_KV(K_(can_aggregate));
@ -47,13 +47,14 @@ protected:
}
protected:
ObColDesc col_desc_;
ObStorageDatum *result_;
bool can_aggregate_;
};
class ObColNullCountAggregator : public ObIColAggregator
{
public:
ObColNullCountAggregator() : null_count_(0), result_(nullptr) {}
ObColNullCountAggregator() : null_count_(0) {}
virtual ~ObColNullCountAggregator() {}
int init(const ObColDesc &col_desc, ObStorageDatum &result) override;
@ -63,14 +64,13 @@ public:
int get_result(const ObStorageDatum *&result) override;
private:
int64_t null_count_;
ObStorageDatum *result_;
DISALLOW_COPY_AND_ASSIGN(ObColNullCountAggregator);
};
class ObColMaxAggregator : public ObIColAggregator
{
public:
ObColMaxAggregator() : cmp_func_(nullptr), result_(nullptr) {}
ObColMaxAggregator() : cmp_func_(nullptr) {}
virtual ~ObColMaxAggregator() {}
int init(const ObColDesc &col_desc, ObStorageDatum &result) override;
@ -80,14 +80,13 @@ public:
int get_result(const ObStorageDatum *&result) override;
private:
common::ObDatumCmpFuncType cmp_func_;
ObStorageDatum *result_;
DISALLOW_COPY_AND_ASSIGN(ObColMaxAggregator);
};
class ObColMinAggregator : public ObIColAggregator
{
public:
ObColMinAggregator() : cmp_func_(nullptr), result_(nullptr) {}
ObColMinAggregator() : cmp_func_(nullptr) {}
virtual ~ObColMinAggregator() {}
int init(const ObColDesc &col_desc, ObStorageDatum &result) override;
@ -97,7 +96,6 @@ public:
int get_result(const ObStorageDatum *&result) override;
private:
common::ObDatumCmpFuncType cmp_func_;
ObStorageDatum *result_;
DISALLOW_COPY_AND_ASSIGN(ObColMinAggregator);
};
@ -120,7 +118,7 @@ public:
// Aggregate with datum row
int eval(const ObDatumRow &datum_row);
// Aggregate with serialized agg row
int eval(const char *buf, const int64_t buf_size);
int eval(const char *buf, const int64_t buf_size, const int64_t row_count);
// Generate aggregated row for serialization
int get_aggregated_row(const ObDatumRow *&aggregated_row);
int64_t get_max_agg_size() { return max_agg_size_; }

View File

@ -1869,7 +1869,8 @@ int ObMacroBlockWriter::agg_micro_block(const ObMicroIndexInfo &micro_index_info
{
int ret = OB_SUCCESS;
if (micro_index_info.is_pre_aggregated() && nullptr != data_aggregator_) {
if (OB_FAIL(data_aggregator_->eval(micro_index_info.agg_row_buf_, micro_index_info.agg_buf_size_))) {
if (OB_FAIL(data_aggregator_->eval(micro_index_info.agg_row_buf_,
micro_index_info.agg_buf_size_, micro_index_info.get_row_count()))) {
LOG_WARN("Fail to evaluate by micro block", K(ret), K(micro_index_info));
}
}

View File

@ -41,9 +41,9 @@ public:
void generate_row_by_seed(const int64_t seed, ObDatumRow &datum_row);
void reset_min_max_row();
void update_min_max_row(const ObDatumRow &row);
void validate_agg_row(const ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr);
void set_nop_cols(ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr);
bool is_col_in_nop_col_arr(const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs);
void validate_agg_row(const ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr, ObSkipIndexColType *nop_col_types = nullptr);
void set_nop_cols(ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr, ObSkipIndexColType *nop_col_types = nullptr);
bool is_col_in_nop_col_arr(const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs, int64_t &index);
void serialize_agg_row(const ObDatumRow &agg_row, const char *&row_buf, int64_t &row_size);
void get_cmp_func(const ObColDesc &col_desc, ObStorageDatumCmpFunc &cmp_func);
@ -182,13 +182,14 @@ void TestIndexBlockAggregator::update_min_max_row(const ObDatumRow &row)
}
void TestIndexBlockAggregator::validate_agg_row(
const ObDatumRow &datum_row, int64_t nop_col_cnt, int64_t *nop_col_idxs)
const ObDatumRow &datum_row, int64_t nop_col_cnt, int64_t *nop_col_idxs, ObSkipIndexColType *nop_col_types)
{
for (int64_t i = 0; i < full_agg_metas_.count(); ++i) {
ObSkipIndexColMeta idx_meta = full_agg_metas_.at(i);
const int64_t col_idx = idx_meta.col_idx_;
bool is_nop_column = is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs);
if (is_nop_column) {
int64_t index = 0;
bool is_nop_column = is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs, index);
if (is_nop_column && ((nop_col_types == nullptr ) || (nop_col_types != nullptr && nop_col_types[index] == idx_meta.col_type_))) {
ASSERT_TRUE(datum_row.storage_datums_[i].is_nop());
} else if (datum_row.storage_datums_[i].is_nop() || datum_row.storage_datums_[i].is_null()) { // skip for not aggregate data
ASSERT_TRUE(min_row_.storage_datums_[col_idx].is_null());
@ -223,25 +224,33 @@ void TestIndexBlockAggregator::validate_agg_row(
}
void TestIndexBlockAggregator::set_nop_cols(
ObDatumRow &row, int64_t nop_col_cnt, int64_t *nop_col_idxs)
ObDatumRow &row, int64_t nop_col_cnt, int64_t *nop_col_idxs, ObSkipIndexColType *nop_col_types)
{
for (int64_t i = 0; i < full_agg_metas_.count(); ++i) {
ASSERT_TRUE(i < row.get_column_count());
ObSkipIndexColMeta idx_meta = full_agg_metas_.at(i);
const int64_t col_idx = idx_meta.col_idx_;
if (is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs)) {
int64_t index = 0;
if (is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs, index)) {
if (nop_col_types != nullptr) {
if (nop_col_types[index] == idx_meta.col_type_) {
row.storage_datums_[i].set_nop();
}
} else {
row.storage_datums_[i].set_nop();
}
}
}
}
bool TestIndexBlockAggregator::is_col_in_nop_col_arr(
const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs)
const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs, int64_t &index)
{
bool is_nop_column = false;
for (int64_t i = 0; i < nop_col_cnt; ++i) {
if (col_idx == nop_col_idxs[i]) {
is_nop_column = true;
index = i;
break;
}
}
@ -310,14 +319,14 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate)
ASSERT_EQ(OB_SUCCESS, data_aggregator.eval(generate_row));
ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row));
ASSERT_TRUE(nullptr != data_agg_row);
if (0 == test_row_cnt / 2) {
if (0 == i / 2) {
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row));
} else {
const char *row_buf = nullptr;
int64_t row_size = 0;
serialize_agg_row(*data_agg_row, row_buf, row_size);
ASSERT_TRUE(nullptr != row_buf);
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size));
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i));
}
ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row));
ASSERT_TRUE(nullptr != index_agg_row);
@ -348,7 +357,15 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate)
ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row));
ASSERT_TRUE(nullptr != data_agg_row);
set_nop_cols(*const_cast<ObDatumRow *>(data_agg_row), nop_col_cnt, nop_col_idxs);
if (0 == i / 2) {
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row));
} else {
const char *row_buf = nullptr;
int64_t row_size = 0;
serialize_agg_row(*data_agg_row, row_buf, row_size);
ASSERT_TRUE(nullptr != row_buf);
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i));
}
ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row));
ASSERT_TRUE(nullptr != index_agg_row);
update_min_max_row(generate_row);
@ -356,6 +373,39 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate)
validate_agg_row(*index_agg_row, nop_col_cnt, nop_col_idxs);
}
// test null index row
ObSkipIndexColType nop_col_types[3] = {SK_IDX_MAX, SK_IDX_MIN, SK_IDX_NULL_COUNT};
reset_min_max_row();
data_agg_result.reuse();
index_agg_result.reuse();
data_aggregator.reuse();
index_aggregator.reuse();
data_agg_row = nullptr;
index_agg_row = nullptr;
for (int64_t i = 0; i < test_row_cnt; ++i) {
const int64_t seed = random() % test_row_cnt;
generate_row_by_seed(seed, generate_row);
ASSERT_EQ(OB_SUCCESS, data_aggregator.eval(generate_row));
ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row));
ASSERT_TRUE(nullptr != data_agg_row);
set_nop_cols(*const_cast<ObDatumRow *>(data_agg_row), nop_col_cnt, nop_col_idxs, nop_col_types);
if (0 == i / 2) {
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row));
} else {
const char *row_buf = nullptr;
int64_t row_size = 0;
serialize_agg_row(*data_agg_row, row_buf, row_size);
ASSERT_TRUE(nullptr != row_buf);
ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i));
}
ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row));
ASSERT_TRUE(nullptr != index_agg_row);
update_min_max_row(generate_row);
validate_agg_row(*data_agg_row, nop_col_cnt, nop_col_idxs, nop_col_types);
validate_agg_row(*index_agg_row, nop_col_cnt, nop_col_idxs, nop_col_types);
}
// test reuse
reset_min_max_row();
data_agg_result.reuse();