diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp index a80d4e6620..8ced998f57 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.cpp @@ -275,5 +275,24 @@ Status BinaryPrefixPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) { return Status::OK(); } +Status BinaryPrefixPageDecoder::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { + DCHECK(_parsed); + if (PREDICT_FALSE(*n == 0 || _cur_pos >= _num_values)) { + *n = 0; + return Status::OK(); + } + size_t max_fetch = std::min(*n, static_cast(_num_values - _cur_pos)); + + // read and copy values + for (size_t i = 0; i < max_fetch; ++i) { + dst->insert_data((char*)(_current_value.data()), _current_value.size()); + _read_next_value(); + _cur_pos++; + } + + *n = max_fetch; + return Status::OK(); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/binary_prefix_page.h b/be/src/olap/rowset/segment_v2/binary_prefix_page.h index bec5f3f7b4..53e966c191 100644 --- a/be/src/olap/rowset/segment_v2/binary_prefix_page.h +++ b/be/src/olap/rowset/segment_v2/binary_prefix_page.h @@ -113,9 +113,7 @@ public: Status next_batch(size_t* n, ColumnBlockView* dst) override; - Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) override { - return Status::NotSupported("binary prefix page not implement vec op now"); - }; + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) override; size_t count() const override { DCHECK(_parsed); @@ -173,4 +171,4 @@ private: }; } // namespace segment_v2 -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp index eb108f8e8d..ce8e164d58 100644 --- a/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/bitmap_index_reader.cpp @@ -18,6 +18,7 @@ #include "olap/rowset/segment_v2/bitmap_index_reader.h" #include "olap/types.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { namespace segment_v2 { @@ -49,18 +50,16 @@ Status BitmapIndexIterator::read_bitmap(rowid_t ordinal, roaring::Roaring* resul DCHECK(0 <= ordinal && ordinal < _reader->bitmap_nums()); size_t num_to_read = 1; - std::unique_ptr cvb; - RETURN_IF_ERROR( - ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb)); - ColumnBlock block(cvb.get(), _pool.get()); - ColumnBlockView column_block_view(&block); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + _reader->type_info()->type(), 1, 0); + auto column = data_type->create_column(); RETURN_IF_ERROR(_bitmap_column_iter.seek_to_ordinal(ordinal)); size_t num_read = num_to_read; - RETURN_IF_ERROR(_bitmap_column_iter.next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(_bitmap_column_iter.next_batch(&num_read, column)); DCHECK(num_to_read == num_read); - *result = roaring::Roaring::read(reinterpret_cast(block.data())->data, false); + *result = roaring::Roaring::read(column->get_data_at(0).data, false); _pool->clear(); return Status::OK(); } diff --git a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp index 1f6a733776..cb94f029b0 100644 --- a/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp +++ b/be/src/olap/rowset/segment_v2/bloom_filter_index_reader.cpp @@ -19,6 +19,7 @@ #include "olap/rowset/segment_v2/bloom_filter.h" #include "olap/types.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { namespace segment_v2 { @@ -39,24 +40,22 @@ Status BloomFilterIndexReader::new_iterator(std::unique_ptr* bf) { size_t num_to_read = 1; - std::unique_ptr cvb; - RETURN_IF_ERROR( - ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb)); - ColumnBlock block(cvb.get(), _pool.get()); - ColumnBlockView column_block_view(&block); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + _reader->type_info()->type(), 1, 0); + auto column = data_type->create_column(); RETURN_IF_ERROR(_bloom_filter_iter.seek_to_ordinal(ordinal)); size_t num_read = num_to_read; - RETURN_IF_ERROR(_bloom_filter_iter.next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(_bloom_filter_iter.next_batch(&num_read, column)); DCHECK(num_to_read == num_read); // construct bloom filter - const Slice* value_ptr = reinterpret_cast(block.data()); - BloomFilter::create(_reader->_bloom_filter_index_meta->algorithm(), bf, value_ptr->size); - RETURN_IF_ERROR((*bf)->init(value_ptr->data, value_ptr->size, + StringRef value = column->get_data_at(0); + BloomFilter::create(_reader->_bloom_filter_index_meta->algorithm(), bf, value.size); + RETURN_IF_ERROR((*bf)->init(value.data, value.size, _reader->_bloom_filter_index_meta->hash_strategy())); _pool->clear(); return Status::OK(); } } // namespace segment_v2 -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp index 424e1b7141..bac8b7b51a 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.cpp @@ -258,5 +258,40 @@ Status IndexedColumnIterator::next_batch(size_t* n, ColumnBlockView* column_view return Status::OK(); } +Status IndexedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { + DCHECK(_seeked); + if (_current_ordinal == _reader->num_values()) { + *n = 0; + return Status::OK(); + } + + size_t remaining = *n; + while (remaining > 0) { + if (!_data_page.has_remaining()) { + // trying to read next data page + if (!_reader->_has_index_page) { + break; // no more data page + } + bool has_next = _current_iter->move_next(); + if (!has_next) { + break; // no more data page + } + RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer())); + } + + size_t rows_to_read = std::min(_data_page.remaining(), remaining); + size_t rows_read = rows_to_read; + RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst)); + DCHECK(rows_to_read == rows_read); + + _data_page.offset_in_page += rows_read; + _current_ordinal += rows_read; + remaining -= rows_read; + } + *n -= remaining; + _seeked = false; + return Status::OK(); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/indexed_column_reader.h b/be/src/olap/rowset/segment_v2/indexed_column_reader.h index 3546fe728e..c7e46d2bf2 100644 --- a/be/src/olap/rowset/segment_v2/indexed_column_reader.h +++ b/be/src/olap/rowset/segment_v2/indexed_column_reader.h @@ -131,6 +131,9 @@ public: // from Arena Status next_batch(size_t* n, ColumnBlockView* column_view); + // After one seek, we can only call this function once to read data + Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst); + private: Status _read_data_page(const PagePointer& pp); diff --git a/be/src/olap/rowset/segment_v2/page_decoder.h b/be/src/olap/rowset/segment_v2/page_decoder.h index 7c817a08ac..9207825de3 100644 --- a/be/src/olap/rowset/segment_v2/page_decoder.h +++ b/be/src/olap/rowset/segment_v2/page_decoder.h @@ -82,9 +82,7 @@ public: // allocated in the column_vector_view's mem_pool. virtual Status next_batch(size_t* n, ColumnBlockView* dst) = 0; - virtual Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) { - return Status::NotSupported("not implement vec op now"); - } + virtual Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) = 0; virtual Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n, vectorized::MutableColumnPtr& dst) { diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 76b1c82a0c..afe1b14890 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -36,6 +36,7 @@ #include "olap/tablet_schema.h" #include "util/crc32c.h" #include "util/slice.h" // Slice +#include "vec/data_types/data_type_factory.hpp" #include "vec/olap/vgeneric_iterators.h" namespace doris { @@ -316,20 +317,18 @@ Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) { row_location->segment_id = _segment_id; if (has_seq_col) { - MemPool pool; size_t num_to_read = 1; - std::unique_ptr cvb; - RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, _pk_index_reader->type_info(), - nullptr, &cvb)); - ColumnBlock block(cvb.get(), &pool); - ColumnBlockView column_block_view(&block); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + _pk_index_reader->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); size_t num_read = num_to_read; - RETURN_IF_ERROR(index_iterator->next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column)); DCHECK(num_to_read == num_read); - const Slice* sought_key = reinterpret_cast(cvb->cell_ptr(0)); + Slice sought_key = + Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); Slice sought_key_without_seq = - Slice(sought_key->get_data(), sought_key->get_size() - seq_col_length); + Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length); // compare key if (key_without_seq.compare(sought_key_without_seq) != 0) { @@ -340,7 +339,7 @@ Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) { Slice sequence_id = Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1); Slice previous_sequence_id = Slice( - sought_key->get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1); + sought_key.get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1); if (sequence_id.compare(previous_sequence_id) < 0) { return Status::AlreadyExist("key with higher sequence id exists"); } diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp index f6da5ce4be..3ca445cf77 100644 --- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp +++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp @@ -32,6 +32,7 @@ #include "util/doris_metrics.h" #include "util/key_util.h" #include "util/simd/bits.h" +#include "vec/data_types/data_type_factory.hpp" #include "vec/data_types/data_type_number.h" #include "vec/exprs/vliteral.h" @@ -808,20 +809,18 @@ Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool _segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx()) .length() + 1; - MemPool pool; + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + _segment->_pk_index_reader->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); size_t num_to_read = 1; - std::unique_ptr cvb; - RETURN_IF_ERROR(ColumnVectorBatch::create( - num_to_read, false, _segment->_pk_index_reader->type_info(), nullptr, &cvb)); - ColumnBlock block(cvb.get(), &pool); - ColumnBlockView column_block_view(&block); size_t num_read = num_to_read; - RETURN_IF_ERROR(index_iterator->next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column)); DCHECK(num_to_read == num_read); - const Slice* sought_key = reinterpret_cast(cvb->cell_ptr(0)); + Slice sought_key = + Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); Slice sought_key_without_seq = - Slice(sought_key->get_data(), sought_key->get_size() - seq_col_length); + Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length); // compare key if (Slice(index_key).compare(sought_key_without_seq) == 0) { diff --git a/be/src/olap/rowset/segment_v2/zone_map_index.cpp b/be/src/olap/rowset/segment_v2/zone_map_index.cpp index c6ed824b4d..ea21db3fbe 100644 --- a/be/src/olap/rowset/segment_v2/zone_map_index.cpp +++ b/be/src/olap/rowset/segment_v2/zone_map_index.cpp @@ -17,8 +17,6 @@ #include "olap/rowset/segment_v2/zone_map_index.h" -#include "olap/column_block.h" -#include "olap/olap_define.h" #include "olap/rowset/segment_v2/encoding_info.h" #include "olap/rowset/segment_v2/indexed_column_reader.h" #include "olap/rowset/segment_v2/indexed_column_writer.h" @@ -144,19 +142,18 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) { // read and cache all page zone maps for (int i = 0; i < reader.num_values(); ++i) { size_t num_to_read = 1; - std::unique_ptr cvb; - RETURN_IF_ERROR( - ColumnVectorBatch::create(num_to_read, false, reader.type_info(), nullptr, &cvb)); - ColumnBlock block(cvb.get(), &pool); - ColumnBlockView column_block_view(&block); + // The type of reader is OLAP_FIELD_TYPE_OBJECT. + // ColumnBitmap will be created when using OLAP_FIELD_TYPE_OBJECT. + // But what we need actually is ColumnString. + vectorized::MutableColumnPtr column = vectorized::ColumnString::create(); RETURN_IF_ERROR(iter.seek_to_ordinal(i)); size_t num_read = num_to_read; - RETURN_IF_ERROR(iter.next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(iter.next_batch(&num_read, column)); DCHECK(num_to_read == num_read); - Slice* value = reinterpret_cast(cvb->data()); - if (!_page_zone_maps[i].ParseFromArray(value->data, value->size)) { + if (!_page_zone_maps[i].ParseFromArray(column->get_data_at(0).data, + column->get_data_at(0).size)) { return Status::Corruption("Failed to parse zone map"); } pool.clear(); diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index d1c3ac4029..f54a8f567c 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -65,6 +65,7 @@ #include "util/scoped_cleanup.h" #include "util/time.h" #include "util/trace.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { using namespace ErrorCode; @@ -1926,36 +1927,34 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id, bool exact_match = false; std::string last_key; int batch_size = 1024; - MemPool pool; while (remaining > 0) { std::unique_ptr iter; RETURN_IF_ERROR(pk_idx->new_iterator(&iter)); size_t num_to_read = std::min(batch_size, remaining); - std::unique_ptr cvb; - RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(), - nullptr, &cvb)); - ColumnBlock block(cvb.get(), &pool); - ColumnBlockView column_block_view(&block); + auto index_type = vectorized::DataTypeFactory::instance().create_data_type( + pk_idx->type_info()->type(), 1, 0); + auto index_column = index_type->create_column(); Slice last_key_slice(last_key); RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match)); size_t num_read = num_to_read; - RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view)); + RETURN_IF_ERROR(iter->next_batch(&num_read, index_column)); DCHECK(num_to_read == num_read); - last_key = (reinterpret_cast(cvb->cell_ptr(num_read - 1)))->to_string(); + last_key = index_column->get_data_at(num_read - 1).to_string(); // exclude last_key, last_key will be read in next batch. if (num_read == batch_size && num_read != remaining) { num_read -= 1; } for (size_t i = 0; i < num_read; i++) { - const Slice* key = reinterpret_cast(cvb->cell_ptr(i)); + Slice key = + Slice(index_column->get_data_at(i).data, index_column->get_data_at(i).size); RowLocation loc; // first check if exist in pre segment if (check_pre_segments) { - auto st = _check_pk_in_pre_segments(rowset_id, pre_segments, *key, - dummy_version, delete_bitmap, &loc); + auto st = _check_pk_in_pre_segments(rowset_id, pre_segments, key, dummy_version, + delete_bitmap, &loc); if (st.ok()) { delete_bitmap->add({rowset_id, loc.segment_id, dummy_version.first}, loc.row_id); @@ -1969,7 +1968,7 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id, } if (specified_rowset_ids != nullptr && !specified_rowset_ids->empty()) { - auto st = lookup_row_key(*key, specified_rowset_ids, &loc, + auto st = lookup_row_key(key, specified_rowset_ids, &loc, dummy_version.first - 1); CHECK(st.ok() || st.is() || st.is()); if (st.is()) { diff --git a/be/src/vec/data_types/data_type_factory.hpp b/be/src/vec/data_types/data_type_factory.hpp index 696ab9ee54..ed270b40ea 100644 --- a/be/src/vec/data_types/data_type_factory.hpp +++ b/be/src/vec/data_types/data_type_factory.hpp @@ -124,6 +124,10 @@ public: return create_data_type(TypeDescriptor::from_thrift(raw_type), raw_type.is_nullable); } + DataTypePtr create_data_type(const FieldType& type, int precision, int scale) { + return _create_primitive_data_type(type, precision, scale); + } + private: DataTypePtr _create_primitive_data_type(const FieldType& type, int precision, int scale) const; diff --git a/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp index 6db57f19a7..f7d6b63ce1 100644 --- a/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp +++ b/be/test/olap/rowset/segment_v2/binary_prefix_page_test.cpp @@ -29,6 +29,7 @@ #include "olap/types.h" #include "runtime/mem_pool.h" #include "util/debug_util.h" +#include "vec/data_types/data_type_factory.hpp" namespace doris { using namespace ErrorCode; @@ -150,6 +151,118 @@ public: EXPECT_TRUE(!exact_match); } + void test_encode_and_decode_vec() { + std::vector test_data; + for (int i = 1000; i < 1038; ++i) { + test_data.emplace_back(std::to_string(i)); + } + std::vector slices; + for (const auto& data : test_data) { + slices.emplace_back(Slice(data)); + } + // encode + PageBuilderOptions options; + BinaryPrefixPageBuilder page_builder(options); + + size_t count = slices.size(); + const Slice* ptr = &slices[0]; + Status ret = page_builder.add(reinterpret_cast(ptr), &count); + + OwnedSlice dict_slice = page_builder.finish(); + EXPECT_EQ(slices.size(), page_builder.count()); + EXPECT_FALSE(page_builder.is_page_full()); + + //check first value and last value + Slice first_value; + page_builder.get_first_value(&first_value); + EXPECT_EQ(slices[0], first_value); + Slice last_value; + page_builder.get_last_value(&last_value); + EXPECT_EQ(slices[count - 1], last_value); + + PageDecoderOptions dict_decoder_options; + std::unique_ptr page_decoder( + new BinaryPrefixPageDecoder(dict_slice.slice(), dict_decoder_options)); + ret = page_decoder->init(); + EXPECT_TRUE(ret.ok()); + // because every slice is unique + EXPECT_EQ(slices.size(), page_decoder->count()); + auto type_info = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR); + size_t size = slices.size(); + + { + //check values + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + type_info->type(), 1, 0); + auto column = data_type->create_column(); + + ret = page_decoder->next_batch(&size, column); + EXPECT_TRUE(ret.ok()); + EXPECT_EQ(slices.size(), size); + for (int i = 1000; i < 1038; ++i) { + EXPECT_EQ(std::to_string(i), column->get_data_at(i - 1000).to_string()); + } + } + + { + ret = page_decoder->seek_to_position_in_page(0); + EXPECT_TRUE(ret.ok()); + int n = 0; + while (true) { + //check values + MemPool pool; + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + type_info->type(), 1, 0); + auto column = data_type->create_column(); + size_t size = 6; + ret = page_decoder->next_batch(&size, column); + EXPECT_TRUE(ret.ok()); + if (size == 0) { + break; + } + for (int i = 0; i < size; ++i) { + EXPECT_EQ(std::to_string(1000 + 6 * n + i), column->get_data_at(i).to_string()); + } + n++; + } + } + + { + auto data_type = vectorized::DataTypeFactory::instance().create_data_type( + type_info->type(), 1, 0); + auto column = data_type->create_column(); + ret = page_decoder->seek_to_position_in_page(15); + EXPECT_TRUE(ret.ok()); + + ret = page_decoder->next_batch(&size, column); + EXPECT_TRUE(ret.ok()); + EXPECT_EQ(23, size); + for (int i = 1015; i < 1038; ++i) { + EXPECT_EQ(std::to_string(i), column->get_data_at(i - 1015).to_string()); + } + } + + Slice v1 = Slice("1039"); + bool exact_match; + ret = page_decoder->seek_at_or_after_value(&v1, &exact_match); + EXPECT_TRUE(ret.is()); + + Slice v2 = Slice("1000"); + ret = page_decoder->seek_at_or_after_value(&v2, &exact_match); + EXPECT_TRUE(ret.ok()); + EXPECT_TRUE(exact_match); + + Slice v3 = Slice("1037"); + ret = page_decoder->seek_at_or_after_value(&v3, &exact_match); + EXPECT_TRUE(ret.ok()); + EXPECT_TRUE(exact_match); + + Slice v4 = Slice("100"); + ret = page_decoder->seek_at_or_after_value(&v4, &exact_match); + EXPECT_TRUE(ret.ok()); + EXPECT_TRUE(!exact_match); + } + void test_encode_and_decode2() { std::vector test_data; test_data.push_back("ab"); @@ -185,6 +298,7 @@ public: TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode) { test_encode_and_decode(); + test_encode_and_decode_vec(); } TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode2) {