[refactor](non-vec) remove non vec code for indexed column reader (#15409)

This commit is contained in:
Xin Liao
2022-12-30 23:01:54 +08:00
committed by GitHub
parent 9bba2f4cde
commit cc7a9d92ad
13 changed files with 228 additions and 65 deletions

View File

@ -275,5 +275,24 @@ Status BinaryPrefixPageDecoder::next_batch(size_t* n, ColumnBlockView* dst) {
return Status::OK();
}
Status BinaryPrefixPageDecoder::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
DCHECK(_parsed);
if (PREDICT_FALSE(*n == 0 || _cur_pos >= _num_values)) {
*n = 0;
return Status::OK();
}
size_t max_fetch = std::min(*n, static_cast<size_t>(_num_values - _cur_pos));
// read and copy values
for (size_t i = 0; i < max_fetch; ++i) {
dst->insert_data((char*)(_current_value.data()), _current_value.size());
_read_next_value();
_cur_pos++;
}
*n = max_fetch;
return Status::OK();
}
} // namespace segment_v2
} // namespace doris

View File

@ -113,9 +113,7 @@ public:
Status next_batch(size_t* n, ColumnBlockView* dst) override;
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) override {
return Status::NotSupported("binary prefix page not implement vec op now");
};
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) override;
size_t count() const override {
DCHECK(_parsed);
@ -173,4 +171,4 @@ private:
};
} // namespace segment_v2
} // namespace doris
} // namespace doris

View File

@ -18,6 +18,7 @@
#include "olap/rowset/segment_v2/bitmap_index_reader.h"
#include "olap/types.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
namespace segment_v2 {
@ -49,18 +50,16 @@ Status BitmapIndexIterator::read_bitmap(rowid_t ordinal, roaring::Roaring* resul
DCHECK(0 <= ordinal && ordinal < _reader->bitmap_nums());
size_t num_to_read = 1;
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(
ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb));
ColumnBlock block(cvb.get(), _pool.get());
ColumnBlockView column_block_view(&block);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
_reader->type_info()->type(), 1, 0);
auto column = data_type->create_column();
RETURN_IF_ERROR(_bitmap_column_iter.seek_to_ordinal(ordinal));
size_t num_read = num_to_read;
RETURN_IF_ERROR(_bitmap_column_iter.next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(_bitmap_column_iter.next_batch(&num_read, column));
DCHECK(num_to_read == num_read);
*result = roaring::Roaring::read(reinterpret_cast<const Slice*>(block.data())->data, false);
*result = roaring::Roaring::read(column->get_data_at(0).data, false);
_pool->clear();
return Status::OK();
}

View File

@ -19,6 +19,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/types.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
namespace segment_v2 {
@ -39,24 +40,22 @@ Status BloomFilterIndexReader::new_iterator(std::unique_ptr<BloomFilterIndexIter
Status BloomFilterIndexIterator::read_bloom_filter(rowid_t ordinal,
std::unique_ptr<BloomFilter>* bf) {
size_t num_to_read = 1;
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(
ColumnVectorBatch::create(num_to_read, false, _reader->type_info(), nullptr, &cvb));
ColumnBlock block(cvb.get(), _pool.get());
ColumnBlockView column_block_view(&block);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
_reader->type_info()->type(), 1, 0);
auto column = data_type->create_column();
RETURN_IF_ERROR(_bloom_filter_iter.seek_to_ordinal(ordinal));
size_t num_read = num_to_read;
RETURN_IF_ERROR(_bloom_filter_iter.next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(_bloom_filter_iter.next_batch(&num_read, column));
DCHECK(num_to_read == num_read);
// construct bloom filter
const Slice* value_ptr = reinterpret_cast<const Slice*>(block.data());
BloomFilter::create(_reader->_bloom_filter_index_meta->algorithm(), bf, value_ptr->size);
RETURN_IF_ERROR((*bf)->init(value_ptr->data, value_ptr->size,
StringRef value = column->get_data_at(0);
BloomFilter::create(_reader->_bloom_filter_index_meta->algorithm(), bf, value.size);
RETURN_IF_ERROR((*bf)->init(value.data, value.size,
_reader->_bloom_filter_index_meta->hash_strategy()));
_pool->clear();
return Status::OK();
}
} // namespace segment_v2
} // namespace doris
} // namespace doris

View File

@ -258,5 +258,40 @@ Status IndexedColumnIterator::next_batch(size_t* n, ColumnBlockView* column_view
return Status::OK();
}
Status IndexedColumnIterator::next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
DCHECK(_seeked);
if (_current_ordinal == _reader->num_values()) {
*n = 0;
return Status::OK();
}
size_t remaining = *n;
while (remaining > 0) {
if (!_data_page.has_remaining()) {
// trying to read next data page
if (!_reader->_has_index_page) {
break; // no more data page
}
bool has_next = _current_iter->move_next();
if (!has_next) {
break; // no more data page
}
RETURN_IF_ERROR(_read_data_page(_current_iter->current_page_pointer()));
}
size_t rows_to_read = std::min(_data_page.remaining(), remaining);
size_t rows_read = rows_to_read;
RETURN_IF_ERROR(_data_page.data_decoder->next_batch(&rows_read, dst));
DCHECK(rows_to_read == rows_read);
_data_page.offset_in_page += rows_read;
_current_ordinal += rows_read;
remaining -= rows_read;
}
*n -= remaining;
_seeked = false;
return Status::OK();
}
} // namespace segment_v2
} // namespace doris

View File

@ -131,6 +131,9 @@ public:
// from Arena
Status next_batch(size_t* n, ColumnBlockView* column_view);
// After one seek, we can only call this function once to read data
Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst);
private:
Status _read_data_page(const PagePointer& pp);

View File

@ -82,9 +82,7 @@ public:
// allocated in the column_vector_view's mem_pool.
virtual Status next_batch(size_t* n, ColumnBlockView* dst) = 0;
virtual Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) {
return Status::NotSupported("not implement vec op now");
}
virtual Status next_batch(size_t* n, vectorized::MutableColumnPtr& dst) = 0;
virtual Status read_by_rowids(const rowid_t* rowids, ordinal_t page_first_ordinal, size_t* n,
vectorized::MutableColumnPtr& dst) {

View File

@ -36,6 +36,7 @@
#include "olap/tablet_schema.h"
#include "util/crc32c.h"
#include "util/slice.h" // Slice
#include "vec/data_types/data_type_factory.hpp"
#include "vec/olap/vgeneric_iterators.h"
namespace doris {
@ -316,20 +317,18 @@ Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) {
row_location->segment_id = _segment_id;
if (has_seq_col) {
MemPool pool;
size_t num_to_read = 1;
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, _pk_index_reader->type_info(),
nullptr, &cvb));
ColumnBlock block(cvb.get(), &pool);
ColumnBlockView column_block_view(&block);
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
_pk_index_reader->type_info()->type(), 1, 0);
auto index_column = index_type->create_column();
size_t num_read = num_to_read;
RETURN_IF_ERROR(index_iterator->next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
DCHECK(num_to_read == num_read);
const Slice* sought_key = reinterpret_cast<const Slice*>(cvb->cell_ptr(0));
Slice sought_key =
Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
Slice sought_key_without_seq =
Slice(sought_key->get_data(), sought_key->get_size() - seq_col_length);
Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length);
// compare key
if (key_without_seq.compare(sought_key_without_seq) != 0) {
@ -340,7 +339,7 @@ Status Segment::lookup_row_key(const Slice& key, RowLocation* row_location) {
Slice sequence_id =
Slice(key.get_data() + key_without_seq.get_size() + 1, seq_col_length - 1);
Slice previous_sequence_id = Slice(
sought_key->get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1);
sought_key.get_data() + sought_key_without_seq.get_size() + 1, seq_col_length - 1);
if (sequence_id.compare(previous_sequence_id) < 0) {
return Status::AlreadyExist("key with higher sequence id exists");
}

View File

@ -32,6 +32,7 @@
#include "util/doris_metrics.h"
#include "util/key_util.h"
#include "util/simd/bits.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_number.h"
#include "vec/exprs/vliteral.h"
@ -808,20 +809,18 @@ Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool
_segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx())
.length() +
1;
MemPool pool;
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
_segment->_pk_index_reader->type_info()->type(), 1, 0);
auto index_column = index_type->create_column();
size_t num_to_read = 1;
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(ColumnVectorBatch::create(
num_to_read, false, _segment->_pk_index_reader->type_info(), nullptr, &cvb));
ColumnBlock block(cvb.get(), &pool);
ColumnBlockView column_block_view(&block);
size_t num_read = num_to_read;
RETURN_IF_ERROR(index_iterator->next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
DCHECK(num_to_read == num_read);
const Slice* sought_key = reinterpret_cast<const Slice*>(cvb->cell_ptr(0));
Slice sought_key =
Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
Slice sought_key_without_seq =
Slice(sought_key->get_data(), sought_key->get_size() - seq_col_length);
Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length);
// compare key
if (Slice(index_key).compare(sought_key_without_seq) == 0) {

View File

@ -17,8 +17,6 @@
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/column_block.h"
#include "olap/olap_define.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/indexed_column_reader.h"
#include "olap/rowset/segment_v2/indexed_column_writer.h"
@ -144,19 +142,18 @@ Status ZoneMapIndexReader::load(bool use_page_cache, bool kept_in_memory) {
// read and cache all page zone maps
for (int i = 0; i < reader.num_values(); ++i) {
size_t num_to_read = 1;
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(
ColumnVectorBatch::create(num_to_read, false, reader.type_info(), nullptr, &cvb));
ColumnBlock block(cvb.get(), &pool);
ColumnBlockView column_block_view(&block);
// The type of reader is OLAP_FIELD_TYPE_OBJECT.
// ColumnBitmap will be created when using OLAP_FIELD_TYPE_OBJECT.
// But what we need actually is ColumnString.
vectorized::MutableColumnPtr column = vectorized::ColumnString::create();
RETURN_IF_ERROR(iter.seek_to_ordinal(i));
size_t num_read = num_to_read;
RETURN_IF_ERROR(iter.next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(iter.next_batch(&num_read, column));
DCHECK(num_to_read == num_read);
Slice* value = reinterpret_cast<Slice*>(cvb->data());
if (!_page_zone_maps[i].ParseFromArray(value->data, value->size)) {
if (!_page_zone_maps[i].ParseFromArray(column->get_data_at(0).data,
column->get_data_at(0).size)) {
return Status::Corruption("Failed to parse zone map");
}
pool.clear();

View File

@ -65,6 +65,7 @@
#include "util/scoped_cleanup.h"
#include "util/time.h"
#include "util/trace.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
using namespace ErrorCode;
@ -1926,36 +1927,34 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
bool exact_match = false;
std::string last_key;
int batch_size = 1024;
MemPool pool;
while (remaining > 0) {
std::unique_ptr<segment_v2::IndexedColumnIterator> iter;
RETURN_IF_ERROR(pk_idx->new_iterator(&iter));
size_t num_to_read = std::min(batch_size, remaining);
std::unique_ptr<ColumnVectorBatch> cvb;
RETURN_IF_ERROR(ColumnVectorBatch::create(num_to_read, false, pk_idx->type_info(),
nullptr, &cvb));
ColumnBlock block(cvb.get(), &pool);
ColumnBlockView column_block_view(&block);
auto index_type = vectorized::DataTypeFactory::instance().create_data_type(
pk_idx->type_info()->type(), 1, 0);
auto index_column = index_type->create_column();
Slice last_key_slice(last_key);
RETURN_IF_ERROR(iter->seek_at_or_after(&last_key_slice, &exact_match));
size_t num_read = num_to_read;
RETURN_IF_ERROR(iter->next_batch(&num_read, &column_block_view));
RETURN_IF_ERROR(iter->next_batch(&num_read, index_column));
DCHECK(num_to_read == num_read);
last_key = (reinterpret_cast<const Slice*>(cvb->cell_ptr(num_read - 1)))->to_string();
last_key = index_column->get_data_at(num_read - 1).to_string();
// exclude last_key, last_key will be read in next batch.
if (num_read == batch_size && num_read != remaining) {
num_read -= 1;
}
for (size_t i = 0; i < num_read; i++) {
const Slice* key = reinterpret_cast<const Slice*>(cvb->cell_ptr(i));
Slice key =
Slice(index_column->get_data_at(i).data, index_column->get_data_at(i).size);
RowLocation loc;
// first check if exist in pre segment
if (check_pre_segments) {
auto st = _check_pk_in_pre_segments(rowset_id, pre_segments, *key,
dummy_version, delete_bitmap, &loc);
auto st = _check_pk_in_pre_segments(rowset_id, pre_segments, key, dummy_version,
delete_bitmap, &loc);
if (st.ok()) {
delete_bitmap->add({rowset_id, loc.segment_id, dummy_version.first},
loc.row_id);
@ -1969,7 +1968,7 @@ Status Tablet::calc_delete_bitmap(RowsetId rowset_id,
}
if (specified_rowset_ids != nullptr && !specified_rowset_ids->empty()) {
auto st = lookup_row_key(*key, specified_rowset_ids, &loc,
auto st = lookup_row_key(key, specified_rowset_ids, &loc,
dummy_version.first - 1);
CHECK(st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>());
if (st.is<NOT_FOUND>()) {

View File

@ -124,6 +124,10 @@ public:
return create_data_type(TypeDescriptor::from_thrift(raw_type), raw_type.is_nullable);
}
DataTypePtr create_data_type(const FieldType& type, int precision, int scale) {
return _create_primitive_data_type(type, precision, scale);
}
private:
DataTypePtr _create_primitive_data_type(const FieldType& type, int precision, int scale) const;

View File

@ -29,6 +29,7 @@
#include "olap/types.h"
#include "runtime/mem_pool.h"
#include "util/debug_util.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris {
using namespace ErrorCode;
@ -150,6 +151,118 @@ public:
EXPECT_TRUE(!exact_match);
}
void test_encode_and_decode_vec() {
std::vector<std::string> test_data;
for (int i = 1000; i < 1038; ++i) {
test_data.emplace_back(std::to_string(i));
}
std::vector<Slice> slices;
for (const auto& data : test_data) {
slices.emplace_back(Slice(data));
}
// encode
PageBuilderOptions options;
BinaryPrefixPageBuilder page_builder(options);
size_t count = slices.size();
const Slice* ptr = &slices[0];
Status ret = page_builder.add(reinterpret_cast<const uint8_t*>(ptr), &count);
OwnedSlice dict_slice = page_builder.finish();
EXPECT_EQ(slices.size(), page_builder.count());
EXPECT_FALSE(page_builder.is_page_full());
//check first value and last value
Slice first_value;
page_builder.get_first_value(&first_value);
EXPECT_EQ(slices[0], first_value);
Slice last_value;
page_builder.get_last_value(&last_value);
EXPECT_EQ(slices[count - 1], last_value);
PageDecoderOptions dict_decoder_options;
std::unique_ptr<BinaryPrefixPageDecoder> page_decoder(
new BinaryPrefixPageDecoder(dict_slice.slice(), dict_decoder_options));
ret = page_decoder->init();
EXPECT_TRUE(ret.ok());
// because every slice is unique
EXPECT_EQ(slices.size(), page_decoder->count());
auto type_info = get_scalar_type_info(OLAP_FIELD_TYPE_VARCHAR);
size_t size = slices.size();
{
//check values
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
type_info->type(), 1, 0);
auto column = data_type->create_column();
ret = page_decoder->next_batch(&size, column);
EXPECT_TRUE(ret.ok());
EXPECT_EQ(slices.size(), size);
for (int i = 1000; i < 1038; ++i) {
EXPECT_EQ(std::to_string(i), column->get_data_at(i - 1000).to_string());
}
}
{
ret = page_decoder->seek_to_position_in_page(0);
EXPECT_TRUE(ret.ok());
int n = 0;
while (true) {
//check values
MemPool pool;
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
type_info->type(), 1, 0);
auto column = data_type->create_column();
size_t size = 6;
ret = page_decoder->next_batch(&size, column);
EXPECT_TRUE(ret.ok());
if (size == 0) {
break;
}
for (int i = 0; i < size; ++i) {
EXPECT_EQ(std::to_string(1000 + 6 * n + i), column->get_data_at(i).to_string());
}
n++;
}
}
{
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(
type_info->type(), 1, 0);
auto column = data_type->create_column();
ret = page_decoder->seek_to_position_in_page(15);
EXPECT_TRUE(ret.ok());
ret = page_decoder->next_batch(&size, column);
EXPECT_TRUE(ret.ok());
EXPECT_EQ(23, size);
for (int i = 1015; i < 1038; ++i) {
EXPECT_EQ(std::to_string(i), column->get_data_at(i - 1015).to_string());
}
}
Slice v1 = Slice("1039");
bool exact_match;
ret = page_decoder->seek_at_or_after_value(&v1, &exact_match);
EXPECT_TRUE(ret.is<NOT_FOUND>());
Slice v2 = Slice("1000");
ret = page_decoder->seek_at_or_after_value(&v2, &exact_match);
EXPECT_TRUE(ret.ok());
EXPECT_TRUE(exact_match);
Slice v3 = Slice("1037");
ret = page_decoder->seek_at_or_after_value(&v3, &exact_match);
EXPECT_TRUE(ret.ok());
EXPECT_TRUE(exact_match);
Slice v4 = Slice("100");
ret = page_decoder->seek_at_or_after_value(&v4, &exact_match);
EXPECT_TRUE(ret.ok());
EXPECT_TRUE(!exact_match);
}
void test_encode_and_decode2() {
std::vector<std::string> test_data;
test_data.push_back("ab");
@ -185,6 +298,7 @@ public:
TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode) {
test_encode_and_decode();
test_encode_and_decode_vec();
}
TEST_F(BinaryPrefixPageTest, TestEncodeAndDecode2) {