diff --git a/src/storage/blocksstable/index_block/ob_index_block_aggregator.cpp b/src/storage/blocksstable/index_block/ob_index_block_aggregator.cpp index 03ca468b29..f7a97a440b 100644 --- a/src/storage/blocksstable/index_block/ob_index_block_aggregator.cpp +++ b/src/storage/blocksstable/index_block/ob_index_block_aggregator.cpp @@ -21,6 +21,35 @@ using namespace common; namespace blocksstable { +int ObIColAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) +{ + int ret = OB_SUCCESS; + if (OB_NOT_NULL(result_)) { + ret = OB_INIT_TWICE; + LOG_WARN("Init twice", K(ret)); + } else { + col_desc_ = col_desc; + result_ = &result; + result_->set_null(); + if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) { + set_not_aggregate(); + } + } + return ret; +} + +void ObIColAggregator::reuse() +{ + if (nullptr != result_) { + result_->set_null(); + } + if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) { + set_not_aggregate(); + } else { + can_aggregate_ = true; + } +} + int ObIColAggregator::copy_agg_datum(const ObDatum &src, ObDatum &dst) { int ret = OB_SUCCESS; @@ -40,28 +69,18 @@ int ObIColAggregator::copy_agg_datum(const ObDatum &src, ObDatum &dst) int ObColNullCountAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) { int ret = OB_SUCCESS; - if (OB_NOT_NULL(result_)) { - ret = OB_INIT_TWICE; - LOG_WARN("Init twice", K(ret)); + if (OB_FAIL(ObIColAggregator::init(col_desc, result))) { + LOG_WARN("fail to init ObIColAggregator", K(ret)); } else { null_count_ = 0; - result_ = &result; - if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) { - set_not_aggregate(); - } - col_desc_ = col_desc; } return ret; } void ObColNullCountAggregator::reuse() { + ObIColAggregator::reuse(); null_count_ = 0; - if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) { - set_not_aggregate(); - } else { - can_aggregate_ = true; - } } int ObColNullCountAggregator::eval(const ObStorageDatum &datum, const bool is_data) @@ -103,19 +122,12 @@ int ObColNullCountAggregator::get_result(const ObStorageDatum *&result) int ObColMaxAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) { int ret = OB_SUCCESS; - if (OB_NOT_NULL(result_)) { - ret = OB_INIT_TWICE; - LOG_WARN("Init twice", K(ret)); + if (OB_FAIL(ObIColAggregator::init(col_desc, result))) { + LOG_WARN("fail to init ObIColAggregator", K(ret)); } else { sql::ObExprBasicFuncs *basic_funcs = ObDatumFuncs::get_basic_func( col_desc.col_type_.get_type(), col_desc.col_type_.get_collation_type()); cmp_func_ = basic_funcs->null_first_cmp_; - result_ = &result; - result_->set_null(); - if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) { - set_not_aggregate(); - } - col_desc_ = col_desc; LOG_DEBUG("[SKIP INDEX] init max aggregator", K(col_desc_), K(can_aggregate_)); } return ret; @@ -123,14 +135,7 @@ int ObColMaxAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) void ObColMaxAggregator::reuse() { - if (nullptr != result_) { - result_->set_null(); - } - if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) { - set_not_aggregate(); - } else { - can_aggregate_ = true; - } + ObIColAggregator::reuse(); } int ObColMaxAggregator::eval(const ObStorageDatum &datum, const bool is_data) @@ -174,19 +179,12 @@ int ObColMaxAggregator::get_result(const ObStorageDatum *&result) int ObColMinAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) { int ret = OB_SUCCESS; - if (OB_NOT_NULL(result_)) { - ret = OB_INIT_TWICE; - LOG_WARN("Init twice", K(ret)); + if (OB_FAIL(ObIColAggregator::init(col_desc, result))) { + LOG_WARN("fail to init ObIColAggregator", K(ret)); } else { sql::ObExprBasicFuncs *basic_funcs = ObDatumFuncs::get_basic_func( col_desc.col_type_.get_type(), col_desc.col_type_.get_collation_type()); cmp_func_ = basic_funcs->null_last_cmp_; - result_ = &result; - result_->set_null(); - if (is_skip_index_black_list_type(col_desc.col_type_.get_type())) { - set_not_aggregate(); - } - col_desc_ = col_desc; LOG_DEBUG("[SKIP INDEX] init min aggregator", K(col_desc_), K(can_aggregate_)); } return ret; @@ -194,14 +192,7 @@ int ObColMinAggregator::init(const ObColDesc &col_desc, ObStorageDatum &result) void ObColMinAggregator::reuse() { - if (nullptr != result_) { - result_->set_null(); - } - if (is_skip_index_black_list_type(col_desc_.col_type_.get_type())) { - set_not_aggregate(); - } else { - can_aggregate_ = true; - } + ObIColAggregator::reuse(); } int ObColMinAggregator::eval(const ObStorageDatum &datum, const bool is_data) @@ -352,10 +343,11 @@ int ObSkipIndexAggregator::eval(const ObDatumRow &datum_row) return ret; } -int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size) +int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size, const int64_t row_count) { int ret = OB_SUCCESS; ObStorageDatum tmp_datum; + ObStorageDatum tmp_null_datum; if (IS_NOT_INIT) { ret = OB_NOT_INIT; LOG_WARN("Not init", K(ret)); @@ -368,13 +360,32 @@ int ObSkipIndexAggregator::eval(const char *buf, const int64_t buf_size) evaluated_ = true; for (int64_t i = 0; OB_SUCC(ret) && i < full_agg_metas_->count(); ++i) { tmp_datum.reuse(); + tmp_null_datum.reuse(); const ObSkipIndexColMeta &idx_col_meta = full_agg_metas_->at(i); if (OB_FAIL(agg_row_reader_.read(idx_col_meta, tmp_datum))) { - LOG_WARN("Fail to read aggregated data", K(ret)); + LOG_WARN("Fail to read aggregated data", K(ret), K(idx_col_meta)); } else if (OB_UNLIKELY(tmp_datum.is_ext())) { ret = OB_ERR_UNEXPECTED; - LOG_WARN("Unexpected ext agg datum", K(ret), K(tmp_datum)); - } else if (OB_FAIL(col_aggs_.at(i)->eval(tmp_datum, false))) { + LOG_WARN("Unexpected ext agg datum", K(ret), K(tmp_datum), K(idx_col_meta)); + } else if (tmp_datum.is_null()) { + ObSkipIndexColMeta null_col_meta(idx_col_meta.col_idx_, SK_IDX_NULL_COUNT); + if (OB_FAIL(agg_row_reader_.read(null_col_meta, tmp_null_datum))) { + LOG_WARN("Fail to read aggregated null", K(ret), K(idx_col_meta)); + } else if (tmp_null_datum.is_ext()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("Unexpected null count datum", K(ret), K(tmp_null_datum), K(idx_col_meta)); + } else if (tmp_null_datum.is_null()) { + col_aggs_.at(i)->set_not_aggregate(); + } else if (tmp_null_datum.get_int() > row_count) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("Unexpected null count datum out row count", K(ret), + K(tmp_null_datum), K(row_count), K(null_col_meta)); + } else if (tmp_null_datum.get_int() < row_count) { + col_aggs_.at(i)->set_not_aggregate(); + } + } + + if (FAILEDx(col_aggs_.at(i)->eval(tmp_datum, false))) { col_aggs_.at(i)->set_not_aggregate(); LOG_ERROR("Fail to eval aggregate column", K(ret), K(tmp_datum), K_(is_data), K(idx_col_meta), K(i), K(col_aggs_.at(i)->get_col_decs())); @@ -659,7 +670,7 @@ int ObIndexBlockAggregator::eval(const ObIndexBlockRowDesc &row_desc) ret = OB_INVALID_ARGUMENT; LOG_WARN("Invalid aggregated row header", K(ret), KPC(agg_row_header)); } else if (OB_FAIL(skip_index_aggregator_.eval( - row_desc.serialized_agg_row_buf_, agg_row_header->length_))) { + row_desc.serialized_agg_row_buf_, agg_row_header->length_, row_desc.row_count_))) { LOG_WARN("Fail to aggregate serialized index row", K(ret), K(row_desc), KPC(agg_row_header)); } } else if (OB_FAIL(skip_index_aggregator_.eval(*row_desc.aggregated_row_))) { diff --git a/src/storage/blocksstable/index_block/ob_index_block_aggregator.h b/src/storage/blocksstable/index_block/ob_index_block_aggregator.h index 3746d99b41..baa2f19ffe 100644 --- a/src/storage/blocksstable/index_block/ob_index_block_aggregator.h +++ b/src/storage/blocksstable/index_block/ob_index_block_aggregator.h @@ -24,12 +24,12 @@ namespace blocksstable class ObIColAggregator { public: - ObIColAggregator() : col_desc_(), can_aggregate_(true) {} + ObIColAggregator() : col_desc_(), result_(nullptr), can_aggregate_(true) {} virtual ~ObIColAggregator() {} virtual int init(const ObColDesc &col_desc, ObStorageDatum &result) = 0; virtual void reset() = 0; - virtual void reuse() = 0; + virtual void reuse(); virtual int eval(const ObStorageDatum &datum, const bool is_data) = 0; virtual int get_result(const ObStorageDatum *&result) = 0; VIRTUAL_TO_STRING_KV(K_(can_aggregate)); @@ -47,13 +47,14 @@ protected: } protected: ObColDesc col_desc_; + ObStorageDatum *result_; bool can_aggregate_; }; class ObColNullCountAggregator : public ObIColAggregator { public: - ObColNullCountAggregator() : null_count_(0), result_(nullptr) {} + ObColNullCountAggregator() : null_count_(0) {} virtual ~ObColNullCountAggregator() {} int init(const ObColDesc &col_desc, ObStorageDatum &result) override; @@ -63,14 +64,13 @@ public: int get_result(const ObStorageDatum *&result) override; private: int64_t null_count_; - ObStorageDatum *result_; DISALLOW_COPY_AND_ASSIGN(ObColNullCountAggregator); }; class ObColMaxAggregator : public ObIColAggregator { public: - ObColMaxAggregator() : cmp_func_(nullptr), result_(nullptr) {} + ObColMaxAggregator() : cmp_func_(nullptr) {} virtual ~ObColMaxAggregator() {} int init(const ObColDesc &col_desc, ObStorageDatum &result) override; @@ -80,14 +80,13 @@ public: int get_result(const ObStorageDatum *&result) override; private: common::ObDatumCmpFuncType cmp_func_; - ObStorageDatum *result_; DISALLOW_COPY_AND_ASSIGN(ObColMaxAggregator); }; class ObColMinAggregator : public ObIColAggregator { public: - ObColMinAggregator() : cmp_func_(nullptr), result_(nullptr) {} + ObColMinAggregator() : cmp_func_(nullptr) {} virtual ~ObColMinAggregator() {} int init(const ObColDesc &col_desc, ObStorageDatum &result) override; @@ -97,7 +96,6 @@ public: int get_result(const ObStorageDatum *&result) override; private: common::ObDatumCmpFuncType cmp_func_; - ObStorageDatum *result_; DISALLOW_COPY_AND_ASSIGN(ObColMinAggregator); }; @@ -120,7 +118,7 @@ public: // Aggregate with datum row int eval(const ObDatumRow &datum_row); // Aggregate with serialized agg row - int eval(const char *buf, const int64_t buf_size); + int eval(const char *buf, const int64_t buf_size, const int64_t row_count); // Generate aggregated row for serialization int get_aggregated_row(const ObDatumRow *&aggregated_row); int64_t get_max_agg_size() { return max_agg_size_; } diff --git a/src/storage/blocksstable/ob_macro_block_writer.cpp b/src/storage/blocksstable/ob_macro_block_writer.cpp index 5709cc26e6..06b9826375 100644 --- a/src/storage/blocksstable/ob_macro_block_writer.cpp +++ b/src/storage/blocksstable/ob_macro_block_writer.cpp @@ -1869,7 +1869,8 @@ int ObMacroBlockWriter::agg_micro_block(const ObMicroIndexInfo µ_index_info { int ret = OB_SUCCESS; if (micro_index_info.is_pre_aggregated() && nullptr != data_aggregator_) { - if (OB_FAIL(data_aggregator_->eval(micro_index_info.agg_row_buf_, micro_index_info.agg_buf_size_))) { + if (OB_FAIL(data_aggregator_->eval(micro_index_info.agg_row_buf_, + micro_index_info.agg_buf_size_, micro_index_info.get_row_count()))) { LOG_WARN("Fail to evaluate by micro block", K(ret), K(micro_index_info)); } } diff --git a/unittest/storage/blocksstable/test_index_block_aggregator.cpp b/unittest/storage/blocksstable/test_index_block_aggregator.cpp index 386b3b9c41..1a6cdf887d 100644 --- a/unittest/storage/blocksstable/test_index_block_aggregator.cpp +++ b/unittest/storage/blocksstable/test_index_block_aggregator.cpp @@ -41,9 +41,9 @@ public: void generate_row_by_seed(const int64_t seed, ObDatumRow &datum_row); void reset_min_max_row(); void update_min_max_row(const ObDatumRow &row); - void validate_agg_row(const ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr); - void set_nop_cols(ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr); - bool is_col_in_nop_col_arr(const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs); + void validate_agg_row(const ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr, ObSkipIndexColType *nop_col_types = nullptr); + void set_nop_cols(ObDatumRow &row, int64_t nop_col_cnt = 0, int64_t *nop_col_idxs = nullptr, ObSkipIndexColType *nop_col_types = nullptr); + bool is_col_in_nop_col_arr(const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs, int64_t &index); void serialize_agg_row(const ObDatumRow &agg_row, const char *&row_buf, int64_t &row_size); void get_cmp_func(const ObColDesc &col_desc, ObStorageDatumCmpFunc &cmp_func); @@ -182,13 +182,14 @@ void TestIndexBlockAggregator::update_min_max_row(const ObDatumRow &row) } void TestIndexBlockAggregator::validate_agg_row( - const ObDatumRow &datum_row, int64_t nop_col_cnt, int64_t *nop_col_idxs) + const ObDatumRow &datum_row, int64_t nop_col_cnt, int64_t *nop_col_idxs, ObSkipIndexColType *nop_col_types) { for (int64_t i = 0; i < full_agg_metas_.count(); ++i) { ObSkipIndexColMeta idx_meta = full_agg_metas_.at(i); const int64_t col_idx = idx_meta.col_idx_; - bool is_nop_column = is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs); - if (is_nop_column) { + int64_t index = 0; + bool is_nop_column = is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs, index); + if (is_nop_column && ((nop_col_types == nullptr ) || (nop_col_types != nullptr && nop_col_types[index] == idx_meta.col_type_))) { ASSERT_TRUE(datum_row.storage_datums_[i].is_nop()); } else if (datum_row.storage_datums_[i].is_nop() || datum_row.storage_datums_[i].is_null()) { // skip for not aggregate data ASSERT_TRUE(min_row_.storage_datums_[col_idx].is_null()); @@ -223,25 +224,33 @@ void TestIndexBlockAggregator::validate_agg_row( } void TestIndexBlockAggregator::set_nop_cols( - ObDatumRow &row, int64_t nop_col_cnt, int64_t *nop_col_idxs) + ObDatumRow &row, int64_t nop_col_cnt, int64_t *nop_col_idxs, ObSkipIndexColType *nop_col_types) { for (int64_t i = 0; i < full_agg_metas_.count(); ++i) { ASSERT_TRUE(i < row.get_column_count()); ObSkipIndexColMeta idx_meta = full_agg_metas_.at(i); const int64_t col_idx = idx_meta.col_idx_; - if (is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs)) { - row.storage_datums_[i].set_nop(); + int64_t index = 0; + if (is_col_in_nop_col_arr(col_idx, nop_col_cnt, nop_col_idxs, index)) { + if (nop_col_types != nullptr) { + if (nop_col_types[index] == idx_meta.col_type_) { + row.storage_datums_[i].set_nop(); + } + } else { + row.storage_datums_[i].set_nop(); + } } } } bool TestIndexBlockAggregator::is_col_in_nop_col_arr( - const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs) + const int64_t col_idx, const int64_t nop_col_cnt, int64_t *nop_col_idxs, int64_t &index) { bool is_nop_column = false; for (int64_t i = 0; i < nop_col_cnt; ++i) { if (col_idx == nop_col_idxs[i]) { is_nop_column = true; + index = i; break; } } @@ -310,14 +319,14 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate) ASSERT_EQ(OB_SUCCESS, data_aggregator.eval(generate_row)); ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row)); ASSERT_TRUE(nullptr != data_agg_row); - if (0 == test_row_cnt / 2) { + if (0 == i / 2) { ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row)); } else { const char *row_buf = nullptr; int64_t row_size = 0; serialize_agg_row(*data_agg_row, row_buf, row_size); ASSERT_TRUE(nullptr != row_buf); - ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size)); + ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i)); } ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row)); ASSERT_TRUE(nullptr != index_agg_row); @@ -348,7 +357,15 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate) ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row)); ASSERT_TRUE(nullptr != data_agg_row); set_nop_cols(*const_cast(data_agg_row), nop_col_cnt, nop_col_idxs); - ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row)); + if (0 == i / 2) { + ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row)); + } else { + const char *row_buf = nullptr; + int64_t row_size = 0; + serialize_agg_row(*data_agg_row, row_buf, row_size); + ASSERT_TRUE(nullptr != row_buf); + ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i)); + } ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row)); ASSERT_TRUE(nullptr != index_agg_row); update_min_max_row(generate_row); @@ -356,6 +373,39 @@ TEST_F(TestIndexBlockAggregator, basic_aggregate) validate_agg_row(*index_agg_row, nop_col_cnt, nop_col_idxs); } + // test null index row + ObSkipIndexColType nop_col_types[3] = {SK_IDX_MAX, SK_IDX_MIN, SK_IDX_NULL_COUNT}; + reset_min_max_row(); + data_agg_result.reuse(); + index_agg_result.reuse(); + data_aggregator.reuse(); + index_aggregator.reuse(); + data_agg_row = nullptr; + index_agg_row = nullptr; + for (int64_t i = 0; i < test_row_cnt; ++i) { + const int64_t seed = random() % test_row_cnt; + generate_row_by_seed(seed, generate_row); + ASSERT_EQ(OB_SUCCESS, data_aggregator.eval(generate_row)); + ASSERT_EQ(OB_SUCCESS, data_aggregator.get_aggregated_row(data_agg_row)); + ASSERT_TRUE(nullptr != data_agg_row); + set_nop_cols(*const_cast(data_agg_row), nop_col_cnt, nop_col_idxs, nop_col_types); + if (0 == i / 2) { + ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(*data_agg_row)); + } else { + const char *row_buf = nullptr; + int64_t row_size = 0; + serialize_agg_row(*data_agg_row, row_buf, row_size); + ASSERT_TRUE(nullptr != row_buf); + ASSERT_EQ(OB_SUCCESS, index_aggregator.eval(row_buf, row_size, i)); + } + ASSERT_EQ(OB_SUCCESS, index_aggregator.get_aggregated_row(index_agg_row)); + ASSERT_TRUE(nullptr != index_agg_row); + update_min_max_row(generate_row); + validate_agg_row(*data_agg_row, nop_col_cnt, nop_col_idxs, nop_col_types); + validate_agg_row(*index_agg_row, nop_col_cnt, nop_col_idxs, nop_col_types); + } + + // test reuse reset_min_max_row(); data_agg_result.reuse();