Support segment-level zone map (#1931)

This commit is contained in:
Lijia Liu
2019-10-13 22:06:09 +08:00
committed by ZHAO Chun
parent 7eece1e9e2
commit d68b1b287c
13 changed files with 264 additions and 54 deletions

View File

@ -93,6 +93,7 @@ add_library(Olap STATIC
rowset/segment_v2/binary_dict_page.cpp
rowset/segment_v2/segment.cpp
rowset/segment_v2/segment_iterator.cpp
rowset/segment_v2/empty_segment_iterator.cpp
rowset/segment_v2/segment_writer.cpp
rowset/segment_v2/column_zone_map.cpp
task/engine_batch_load_task.cpp

View File

@ -54,9 +54,15 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
}
// create iterator for each segment
std::vector<std::unique_ptr<segment_v2::SegmentIterator>> seg_iterators;
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
for (auto& seg_ptr : _rowset->_segments) {
seg_iterators.push_back(seg_ptr->new_iterator(schema, read_options));
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, &iter);
if (!s.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
return OLAP_ERR_ROWSET_READER_INIT;
}
seg_iterators.push_back(std::move(iter));
}
std::vector<RowwiseIterator*> iterators;
for (auto& owned_it : seg_iterators) {

View File

@ -247,6 +247,7 @@ void ColumnWriter::write_meta(ColumnMetaPB* meta) {
_ordinal_index_pp.to_proto(meta->mutable_ordinal_index_page());
if (_opts.need_zone_map) {
_zone_map_pp.to_proto(meta->mutable_zone_map_page());
_column_zone_map_builder->fill_segment_zone_map(meta->mutable_zone_map());
}
if (_encoding_info->encoding() == DICT_ENCODING) {
_dict_page_pp.to_proto(meta->mutable_dict_page());

View File

@ -29,8 +29,10 @@ ColumnZoneMapBuilder::ColumnZoneMapBuilder(Field* field) : _field(field) {
_page_builder.reset(new BinaryPlainPageBuilder(options));
_zone_map.min_value = _field->allocate_value_from_arena(&_arena);
_zone_map.max_value = _field->allocate_value_from_arena(&_arena);
_reset_zone_map();
_reset_page_zone_map();
_segment_zone_map.min_value = _field->allocate_value_from_arena(&_arena);
_segment_zone_map.max_value = _field->allocate_value_from_arena(&_arena);
_reset_segment_zone_map();
}
Status ColumnZoneMapBuilder::add(const uint8_t *vals, size_t count) {
@ -56,12 +58,28 @@ Status ColumnZoneMapBuilder::add(const uint8_t *vals, size_t count) {
return Status::OK();
}
void ColumnZoneMapBuilder::fill_segment_zone_map(ZoneMapPB* const to) {
_fill_zone_map_to_pb(_segment_zone_map, to);
}
Status ColumnZoneMapBuilder::flush() {
// Update segment zone map.
if (_field->compare(_segment_zone_map.min_value, _zone_map.min_value) > 0) {
_field->direct_copy_content(_segment_zone_map.min_value, _zone_map.min_value);
}
if (_field->compare(_segment_zone_map.max_value, _zone_map.max_value) < 0) {
_field->direct_copy_content(_segment_zone_map.max_value, _zone_map.max_value);
}
if (!_segment_zone_map.has_null && _zone_map.has_null) {
_segment_zone_map.has_null = true;
}
if (!_segment_zone_map.has_not_null && _zone_map.has_not_null) {
_segment_zone_map.has_not_null = true;
}
ZoneMapPB page_zone_map;
page_zone_map.set_min(_field->to_string(_zone_map.min_value));
page_zone_map.set_max(_field->to_string(_zone_map.max_value));
page_zone_map.set_has_null(_zone_map.has_null);
page_zone_map.set_has_not_null(_zone_map.has_not_null);
_fill_zone_map_to_pb(_zone_map, &page_zone_map);
std::string serialized_zone_map;
bool ret = page_zone_map.SerializeToString(&serialized_zone_map);
if (!ret) {
@ -72,15 +90,22 @@ Status ColumnZoneMapBuilder::flush() {
RETURN_IF_ERROR(_page_builder->add((const uint8_t *)&data, &num));
// reset the variables
// we should allocate max varchar length and set to max for min value
_reset_zone_map();
_reset_page_zone_map();
return Status::OK();
}
void ColumnZoneMapBuilder::_reset_zone_map() {
_field->set_to_max(_zone_map.min_value);
_field->set_to_min(_zone_map.max_value);
_zone_map.has_null = false;
_zone_map.has_not_null = false;
void ColumnZoneMapBuilder::_reset_zone_map(ZoneMap* zone_map) {
_field->set_to_max(zone_map->min_value);
_field->set_to_min(zone_map->max_value);
zone_map->has_null = false;
zone_map->has_not_null = false;
}
void ColumnZoneMapBuilder::_fill_zone_map_to_pb(const ZoneMap& from, ZoneMapPB* const to) {
to->set_has_not_null(from.has_not_null);
to->set_has_null(from.has_null);
to->set_max(_field->to_string(from.max_value));
to->set_min(_field->to_string(from.min_value));
}
Status ColumnZoneMap::load() {

View File

@ -56,6 +56,8 @@ public:
Status flush();
void fill_segment_zone_map(ZoneMapPB* const to);
uint64_t size() {
return _page_builder->size();
}
@ -65,13 +67,17 @@ public:
}
private:
void _reset_zone_map();
void _reset_zone_map(ZoneMap* zone_map);
void _reset_page_zone_map() { _reset_zone_map(&_zone_map); }
void _reset_segment_zone_map() { _reset_zone_map(&_segment_zone_map); }
void _fill_zone_map_to_pb(const ZoneMap& from, ZoneMapPB* const to);
private:
std::unique_ptr<BinaryPlainPageBuilder> _page_builder;
Field* _field;
// memory will be managed by arena
ZoneMap _zone_map;
ZoneMap _segment_zone_map;
Arena _arena;
};

View File

@ -0,0 +1,31 @@
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/empty_segment_iterator.h"
#include "olap/row_block2.h"
namespace doris {
namespace segment_v2 {
EmptySegmentIterator::EmptySegmentIterator(const doris::Schema &schema): _schema(schema) {}
Status EmptySegmentIterator::next_batch(RowBlockV2* block) {
block->set_num_rows(0);
return Status::EndOfFile("no more data in segment");
}
} // namespace segment_v2
} // namespace doris

View File

@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/status.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/iterators.h"
#include "olap/schema.h"
namespace doris {
namespace segment_v2 {
class EmptySegmentIterator : public RowwiseIterator {
public:
explicit EmptySegmentIterator(const Schema& schema);
~EmptySegmentIterator() override {}
Status init(const StorageReadOptions& opts) override { return Status::OK(); }
const Schema& schema() const override { return _schema; }
Status next_batch(RowBlockV2* row_block) override;
private:
Schema _schema;
};
} // namespace segment_v2
} // namespace doris

View File

@ -23,6 +23,7 @@
#include "olap/rowset/segment_v2/column_reader.h" // ColumnReader
#include "olap/rowset/segment_v2/segment_writer.h" // k_segment_magic_length
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "olap/rowset/segment_v2/empty_segment_iterator.h"
#include "util/slice.h" // Slice
#include "olap/tablet_schema.h"
#include "util/crc32c.h"
@ -37,7 +38,8 @@ Segment::Segment(
const TabletSchema* tablet_schema)
: _fname(std::move(fname)),
_segment_id(segment_id),
_tablet_schema(tablet_schema) {
_tablet_schema(tablet_schema),
_index_loaded(false) {
}
Segment::~Segment() {
@ -50,17 +52,69 @@ Status Segment::open() {
RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file));
// parse footer to get meta
RETURN_IF_ERROR(_parse_footer());
// parse short key index
RETURN_IF_ERROR(_parse_index());
// initial all column reader
RETURN_IF_ERROR(_initial_column_readers());
return Status::OK();
}
std::unique_ptr<SegmentIterator> Segment::new_iterator(const Schema& schema, const StorageReadOptions& read_options) {
auto it = std::unique_ptr<SegmentIterator>(new SegmentIterator(this->shared_from_this(), schema));
it->init(read_options);
return it;
Status Segment::new_iterator(
const Schema& schema,
const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter) {
if (read_options.conditions != nullptr) {
for (auto& column_condition : read_options.conditions->columns()) {
int32_t column_id = column_condition.first;
auto entry = _column_id_to_footer_ordinal.find(column_id);
if (entry == _column_id_to_footer_ordinal.end()) {
continue;
}
auto& c_meta = _footer.columns(entry->second);
if (!c_meta.has_zone_map()) {
continue;
}
auto& c_zone_map = c_meta.zone_map();
if (!c_zone_map.has_not_null() && !c_zone_map.has_null()) {
// no data
iter->reset(new EmptySegmentIterator(schema));
return Status::OK();
}
// TODO Logic here and the similar logic in ColumnReader::_get_filtered_pages should be unified.
TypeInfo* type_info = get_type_info((FieldType)c_meta.type());
if (type_info == nullptr) {
return Status::NotSupported(Substitute("unsupported typeinfo, type=$0", c_meta.type()));
}
FieldType type = type_info->type();
std::unique_ptr<WrapperField> min_value(WrapperField::create_by_type(type));
std::unique_ptr<WrapperField> max_value(WrapperField::create_by_type(type));
if (c_zone_map.has_not_null()) {
min_value->from_string(c_zone_map.min());
max_value->from_string(c_zone_map.max());
}
if (c_zone_map.has_null()) {
min_value->set_null();
if (!c_zone_map.has_not_null()) {
max_value->set_null();
}
}
if (!column_condition.second->eval({min_value.get(), max_value.get()})) {
// any condition not satisfied, return.
iter->reset(new EmptySegmentIterator(schema));
return Status::OK();
}
}
}
if(!_index_loaded) {
// parse short key index
RETURN_IF_ERROR(_parse_index());
// initial all column reader
RETURN_IF_ERROR(_initial_column_readers());
_index_loaded = true;
}
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
iter->get()->init(read_options);
return Status::OK();
}
Status Segment::_parse_footer() {
@ -103,6 +157,11 @@ Status Segment::_parse_footer() {
if (!_footer.ParseFromString(footer_buf)) {
return Status::Corruption(Substitute("Bad segment file $0: failed to parse SegmentFooterPB", _fname));
}
for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
auto& column_pb = _footer.columns(ordinal);
_column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}
return Status::OK();
}
@ -120,15 +179,6 @@ Status Segment::_parse_index() {
}
Status Segment::_initial_column_readers() {
// Map from column unique id to column ordinal in footer's ColumnMetaPB
// If we can't find unique id, it means this segment is created
// with an old schema. So we should create a DefaultValueIterator
// for this column.
std::unordered_map<uint32_t, uint32_t> unique_id_to_ordinal;
for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
auto& column_pb = _footer.columns(ordinal);
unique_id_to_ordinal.emplace(column_pb.unique_id(), ordinal);
}
// TODO(zc): Lazy init()?
// There may be too many columns, majority of them would not be used
// in query, so we should not init them here.
@ -136,8 +186,8 @@ Status Segment::_initial_column_readers() {
for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) {
auto& column = _tablet_schema->columns()[ordinal];
auto iter = unique_id_to_ordinal.find(column.unique_id());
if (iter == unique_id_to_ordinal.end()) {
auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
if (iter == _column_id_to_footer_ordinal.end()) {
continue;
}

View File

@ -24,6 +24,7 @@
#include "common/status.h" // Status
#include "gen_cpp/segment_v2.pb.h"
#include "olap/iterators.h"
#include "olap/rowset/segment_v2/common.h" // rowid_t
#include "olap/short_key_index.h"
#include "olap/tablet_schema.h"
@ -49,7 +50,7 @@ using SegmentSharedPtr = std::shared_ptr<Segment>;
// A Segment is used to represent a segment in memory format. When segment is
// generated, it won't be modified, so this struct aimed to help read operation.
// It will prepare all ColumnReader to create ColumnIterator as needed.
// And user can create a SegmentIterator through new_iterator function.
// And user can create a RowwiseIterator through new_iterator function.
//
// NOTE: This segment is used to a specified TabletSchema, when TabletSchema
// is changed, this segment can not be used any more. For example, after a schema
@ -62,7 +63,10 @@ public:
Status open();
std::unique_ptr<SegmentIterator> new_iterator(const Schema& schema, const StorageReadOptions& read_options);
Status new_iterator(
const Schema& schema,
const StorageReadOptions& read_options,
std::unique_ptr<RowwiseIterator>* iter);
uint64_t id() const { return _segment_id; }
@ -112,6 +116,13 @@ private:
// short key index decoder
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;
bool _index_loaded;
// Map from column unique id to column ordinal in footer's ColumnMetaPB
// If we can't find unique id from it, it means this segment is created
// with an old schema.
std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
};
}

View File

@ -341,5 +341,6 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
return Status::OK();
}
}
}

View File

@ -23,7 +23,6 @@
#include "common/status.h"
#include "olap/rowset/segment_v2/common.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/iterators.h"
#include "olap/schema.h"
#include "olap/rowset/segment_v2/row_ranges.h"
#include "olap/rowset/segment_v2/column_zone_map.h"

View File

@ -99,7 +99,8 @@ TEST_F(SegmentReaderWriterTest, normal) {
// scan all rows
{
StorageReadOptions read_opts;
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -153,7 +154,8 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), true);
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 100);
st = iter->next_batch(&block);
@ -177,7 +179,8 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 100);
st = iter->next_batch(&block);
@ -205,7 +208,8 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 100);
st = iter->next_batch(&block);
@ -276,6 +280,30 @@ TEST_F(SegmentReaderWriterTest, TestZoneMap) {
ASSERT_TRUE(st.ok());
ASSERT_EQ(64 * 1024, segment->num_rows());
Schema schema(*tablet_schema);
// test empty segment iterator
{
// the first two page will be read by this condition
TCondition condition;
condition.__set_column_name("3");
condition.__set_condition_op("<");
std::vector<std::string> vals = {"2"};
condition.__set_condition_values(vals);
std::shared_ptr<Conditions> conditions(new Conditions());
conditions->set_tablet_schema(tablet_schema.get());
conditions->append_condition(condition);
StorageReadOptions read_opts;
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1);
st = iter->next_batch(&block);
ASSERT_TRUE(st.is_end_of_file());
ASSERT_EQ(0, block.num_rows());
}
// scan all rows
{
TCondition condition;
@ -290,7 +318,8 @@ TEST_F(SegmentReaderWriterTest, TestZoneMap) {
StorageReadOptions read_opts;
read_opts.conditions = conditions.get();
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -348,7 +377,8 @@ TEST_F(SegmentReaderWriterTest, TestZoneMap) {
read_opts.conditions = conditions.get();
read_opts.delete_conditions.push_back(delete_conditions.get());
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -517,7 +547,8 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
// scan all rows
{
StorageReadOptions read_opts;
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -571,7 +602,8 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
// scan all rows
{
StorageReadOptions read_opts;
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -665,7 +697,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
// scan all rows
{
StorageReadOptions read_opts;
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
@ -710,7 +743,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 100);
st = iter->next_batch(&block);
@ -739,7 +773,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 100);
st = iter->next_batch(&block);
@ -761,7 +796,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.conditions = conditions.get();
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);
int left = 4 * 1024;
@ -810,7 +846,8 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.conditions = conditions.get();
std::unique_ptr<SegmentIterator> iter = segment->new_iterator(schema, read_opts);
std::unique_ptr<RowwiseIterator> iter;
segment->new_iterator(schema, read_opts, &iter);
RowBlockV2 block(schema, 1024);

View File

@ -91,11 +91,12 @@ message ColumnMetaPB {
optional bool is_nullable = 6;
// ordinal index page
optional PagePointerPB ordinal_index_page = 7;
// zone map page
// page-level zone map index
optional PagePointerPB zone_map_page = 8;
// segment-level zone map
optional ZoneMapPB zone_map = 9;
// // dictionary page for DICT_ENCODING
optional PagePointerPB dict_page = 9;
optional PagePointerPB dict_page = 10;
// // bloom filter pages for bloom filter column
// repeated PagePointerPB bloom_filter_pages = 3;