Make ColumnReader load lazily (#2026)

[Storage][SegmentV2]
Currently `segment_v2::Segment::open` will eagerly initialize all column readers, regardless of whether the column is queried or not. Initializing `segment_v2::ColumnReader` incurs additional I/O cost to read ordinal index and zonemap index and should be delayed to the time it's needed.
This commit is contained in:
Dayue Gao
2019-10-23 10:25:28 +08:00
committed by ZHAO Chun
parent 0f94b685ab
commit d25f0ba69a
8 changed files with 122 additions and 108 deletions

View File

@ -17,6 +17,7 @@
#include "olap/rowset/segment_v2/column_reader.h"
#include "common/logging.h"
#include "env/env.h" // for RandomAccessFile
#include "gutil/strings/substitute.h" // for Substitute
#include "olap/rowset/segment_v2/encoding_info.h" // for EncodingInfo
@ -71,37 +72,34 @@ struct ParsedPage {
size_t remaining() const { return num_rows - offset_in_page; }
};
Status ColumnReader::create(const ColumnReaderOptions& opts,
const ColumnMetaPB& meta,
uint64_t num_rows,
RandomAccessFile* file,
std::unique_ptr<ColumnReader>* reader) {
std::unique_ptr<ColumnReader> reader_local(
new ColumnReader(opts, meta, num_rows, file));
RETURN_IF_ERROR(reader_local->init());
*reader = std::move(reader_local);
return Status::OK();
}
ColumnReader::ColumnReader(const ColumnReaderOptions& opts,
const ColumnMetaPB& meta,
uint64_t num_rows,
RandomAccessFile* file)
: _opts(opts),
_meta(meta),
_num_rows(num_rows),
_file(file) {
: _opts(opts), _meta(meta), _num_rows(num_rows), _file(file) {
}
ColumnReader::~ColumnReader() {
}
ColumnReader::~ColumnReader() = default;
Status ColumnReader::init() {
return _init_once.call([this] { return _do_init_once(); });
}
Status ColumnReader::_do_init_once() {
_type_info = get_type_info((FieldType)_meta.type());
if (_type_info == nullptr) {
return Status::NotSupported(Substitute("unsupported typeinfo, type=$0", _meta.type()));
}
RETURN_IF_ERROR(EncodingInfo::get(_type_info, _meta.encoding(), &_encoding_info));
// Get compress codec
RETURN_IF_ERROR(get_block_compression_codec(_meta.compression(), &_compress_codec));
RETURN_IF_ERROR(_init_ordinal_index());
RETURN_IF_ERROR(_init_column_zone_map());
return Status::OK();
}
@ -175,20 +173,23 @@ Status ColumnReader::read_page(const PagePointer& pp, OlapReaderStatistics* stat
return Status::OK();
}
void ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions, OlapReaderStatistics* stats,
RowRanges* row_ranges) {
Status ColumnReader::get_row_ranges_by_zone_map(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats,
RowRanges* row_ranges) {
DCHECK(has_zone_map());
RETURN_IF_ERROR(_ensure_index_loaded());
std::vector<uint32_t> page_indexes;
_get_filtered_pages(cond_column, stats, delete_conditions, &page_indexes);
_calculate_row_ranges(page_indexes, row_ranges);
RETURN_IF_ERROR(_get_filtered_pages(cond_column, delete_conditions, stats, &page_indexes));
RETURN_IF_ERROR(_calculate_row_ranges(page_indexes, row_ranges));
return Status::OK();
}
PagePointer ColumnReader::get_dict_page_pointer() const {
return _meta.dict_page();
}
void ColumnReader::_get_filtered_pages(CondColumn* cond_column, OlapReaderStatistics* stats,
const std::vector<CondColumn*>& delete_conditions, std::vector<uint32_t>* page_indexes) {
Status ColumnReader::_get_filtered_pages(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats,
std::vector<uint32_t>* page_indexes) {
FieldType type = _type_info->type();
const std::vector<ZoneMapPB>& zone_maps = _column_zone_map->get_column_zone_map();
int32_t page_size = _column_zone_map->num_pages();
@ -231,19 +232,20 @@ void ColumnReader::_get_filtered_pages(CondColumn* cond_column, OlapReaderStatis
stats->rows_stats_filtered += page_last_id - page_first_id + 1;
}
}
return Status::OK();
}
void ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges) {
Status ColumnReader::_calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges) {
for (auto i : page_indexes) {
rowid_t page_first_id = _ordinal_index->get_first_row_id(i);
rowid_t page_last_id = _ordinal_index->get_last_row_id(i);
RowRanges page_row_ranges(RowRanges::create_single(page_first_id, page_last_id + 1));
RowRanges::ranges_union(*row_ranges, page_row_ranges, row_ranges);
}
return Status::OK();
}
// initial ordinal index
Status ColumnReader::_init_ordinal_index() {
Status ColumnReader::_load_ordinal_index() {
PagePointer pp = _meta.ordinal_index_page();
PageHandle ph;
OlapReaderStatistics stats;
@ -251,12 +253,10 @@ Status ColumnReader::_init_ordinal_index() {
_ordinal_index.reset(new OrdinalPageIndex(ph.data(), _num_rows));
RETURN_IF_ERROR(_ordinal_index->load());
return Status::OK();
}
// initialize column zone map
Status ColumnReader::_init_column_zone_map() {
Status ColumnReader::_load_zone_map_index() {
if (_meta.has_zone_map_page()) {
PagePointer pp = _meta.zone_map_page();
PageHandle ph;
@ -272,6 +272,7 @@ Status ColumnReader::_init_column_zone_map() {
}
Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) {
RETURN_IF_ERROR(_ensure_index_loaded());
*iter = _ordinal_index->begin();
if (!iter->valid()) {
return Status::NotFound("Failed to seek to first rowid");
@ -280,6 +281,7 @@ Status ColumnReader::seek_to_first(OrdinalPageIndexIterator* iter) {
}
Status ColumnReader::seek_at_or_before(rowid_t rowid, OrdinalPageIndexIterator* iter) {
RETURN_IF_ERROR(_ensure_index_loaded());
*iter = _ordinal_index->seek_at_or_before(rowid);
if (!iter->valid()) {
return Status::NotFound(Substitute("Failed to seek to rowid $0, ", rowid));
@ -290,8 +292,7 @@ Status ColumnReader::seek_at_or_before(rowid_t rowid, OrdinalPageIndexIterator*
FileColumnIterator::FileColumnIterator(ColumnReader* reader) : _reader(reader) {
}
FileColumnIterator::~FileColumnIterator() {
}
FileColumnIterator::~FileColumnIterator() = default;
Status FileColumnIterator::seek_to_first() {
RETURN_IF_ERROR(_reader->seek_to_first(&_page_iter));

View File

@ -63,12 +63,15 @@ struct ColumnIteratorOptions {
// This will cache data shared by all reader
class ColumnReader {
public:
ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
uint64_t num_rows, RandomAccessFile* file);
~ColumnReader();
// Create an initialized ColumnReader in *reader.
// This should be a lightweight operation without I/O.
static Status create(const ColumnReaderOptions& opts,
const ColumnMetaPB& meta,
uint64_t num_rows,
RandomAccessFile* file,
std::unique_ptr<ColumnReader>* reader);
// May be called multiple times, subsequent calls will no op.
Status init();
~ColumnReader();
// create a new column iterator. Client should delete returned iterator
Status new_iterator(ColumnIterator** iterator);
@ -81,47 +84,63 @@ public:
Status read_page(const PagePointer& pp, OlapReaderStatistics* stats, PageHandle* handle);
bool is_nullable() const { return _meta.is_nullable(); }
const EncodingInfo* encoding_info() const { return _encoding_info; }
const TypeInfo* type_info() const { return _type_info; }
bool has_zone_map() { return _meta.has_zone_map_page(); }
bool has_zone_map() const { return _meta.has_zone_map_page(); }
// get row ranges with zone map
// cond_column is user's query predicate
// delete_conditions is a vector of delete predicate of different version
void get_row_ranges_by_zone_map(CondColumn* cond_column, const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats, RowRanges* row_ranges);
// - cond_column is user's query predicate
// - delete_conditions is a vector of delete predicate of different version
Status get_row_ranges_by_zone_map(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats,
RowRanges* row_ranges);
PagePointer get_dict_page_pointer() const;
PagePointer get_dict_page_pointer() const { return _meta.dict_page(); }
private:
Status _do_init_once();
ColumnReader(const ColumnReaderOptions& opts,
const ColumnMetaPB& meta,
uint64_t num_rows,
RandomAccessFile* file);
Status init();
Status _init_ordinal_index();
// Read and load necessary column indexes into memory if it hasn't been loaded.
// May be called multiple times, subsequent calls will no op.
Status _ensure_index_loaded() {
return _load_index_once.call([this] {
RETURN_IF_ERROR(_load_zone_map_index());
RETURN_IF_ERROR(_load_ordinal_index());
return Status::OK();
});
}
Status _load_zone_map_index();
Status _load_ordinal_index();
Status _init_column_zone_map();
Status _get_filtered_pages(CondColumn* cond_column,
const std::vector<CondColumn*>& delete_conditions,
OlapReaderStatistics* stats,
std::vector<uint32_t>* page_indexes);
void _get_filtered_pages(CondColumn* cond_column, OlapReaderStatistics* stats,
const std::vector<CondColumn*>& delete_conditions, std::vector<uint32_t>* page_indexes);
void _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges);
Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges);
private:
ColumnReaderOptions _opts;
ColumnMetaPB _meta;
uint64_t _num_rows;
RandomAccessFile* _file = nullptr;
RandomAccessFile* _file;
DorisCallOnce<Status> _init_once;
// initialized in init()
const TypeInfo* _type_info = nullptr;
const EncodingInfo* _encoding_info = nullptr;
const BlockCompressionCodec* _compress_codec = nullptr;
// get page pointer from index
std::unique_ptr<OrdinalPageIndex> _ordinal_index;
// column zone map info
DorisCallOnce<Status> _load_index_once;
std::unique_ptr<ColumnZoneMap> _column_zone_map;
std::unique_ptr<OrdinalPageIndex> _ordinal_index;
};
// Base iterator to read one column data

View File

@ -51,18 +51,12 @@ Segment::Segment(
_tablet_schema(tablet_schema) {
}
Segment::~Segment() {
for (auto reader : _column_readers) {
delete reader;
}
}
Segment::~Segment() = default;
Status Segment::_open() {
RETURN_IF_ERROR(Env::Default()->new_random_access_file(_fname, &_input_file));
// parse footer to get meta
RETURN_IF_ERROR(_parse_footer());
// initial all column reader
RETURN_IF_ERROR(_initial_column_readers());
RETURN_IF_ERROR(_create_column_readers());
return Status::OK();
}
@ -161,11 +155,6 @@ Status Segment::_parse_footer() {
if (!_footer.ParseFromString(footer_buf)) {
return Status::Corruption(Substitute("Bad segment file $0: failed to parse SegmentFooterPB", _fname));
}
for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
auto& column_pb = _footer.columns(ordinal);
_column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}
return Status::OK();
}
@ -183,12 +172,13 @@ Status Segment::_load_index() {
});
}
Status Segment::_initial_column_readers() {
// TODO(zc): Lazy init()?
// There may be too many columns, majority of them would not be used
// in query, so we should not init them here.
_column_readers.resize(_tablet_schema->columns().size(), nullptr);
Status Segment::_create_column_readers() {
for (uint32_t ordinal = 0; ordinal < _footer.columns().size(); ++ordinal) {
auto& column_pb = _footer.columns(ordinal);
_column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}
_column_readers.resize(_tablet_schema->columns().size());
for (uint32_t ordinal = 0; ordinal < _tablet_schema->num_columns(); ++ordinal) {
auto& column = _tablet_schema->columns()[ordinal];
auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
@ -197,11 +187,10 @@ Status Segment::_initial_column_readers() {
}
ColumnReaderOptions opts;
std::unique_ptr<ColumnReader> reader(
new ColumnReader(opts, _footer.columns(iter->second), _footer.num_rows(), _input_file.get()));
RETURN_IF_ERROR(reader->init());
_column_readers[ordinal] = reader.release();
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(
opts, _footer.columns(iter->second), _footer.num_rows(), _input_file.get(), &reader));
_column_readers[ordinal] = std::move(reader);
}
return Status::OK();
}

View File

@ -83,7 +83,7 @@ private:
// open segment file and read the minimum amount of necessary information (footer)
Status _open();
Status _parse_footer();
Status _initial_column_readers();
Status _create_column_readers();
Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
size_t num_short_keys() const { return _tablet_schema->num_short_key_columns(); }
@ -122,10 +122,15 @@ private:
SegmentFooterPB _footer;
std::unique_ptr<RandomAccessFile> _input_file;
// Map from column unique id to column ordinal in footer's ColumnMetaPB
// If we can't find unique id from it, it means this segment is created
// with an old schema.
std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
// ColumnReader for each column in TabletSchema. If ColumnReader is nullptr,
// This means that this segment has no data for that column, which may be added
// after this segment is generated.
std::vector<ColumnReader*> _column_readers;
std::vector<std::unique_ptr<ColumnReader>> _column_readers;
// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;
@ -133,11 +138,6 @@ private:
faststring _sk_index_buf;
// short key index decoder
std::unique_ptr<ShortKeyIndexDecoder> _sk_index_decoder;
// Map from column unique id to column ordinal in footer's ColumnMetaPB
// If we can't find unique id from it, it means this segment is created
// with an old schema.
std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
};
}

View File

@ -170,8 +170,12 @@ Status SegmentIterator::_get_row_ranges_from_zone_map(RowRanges* zone_map_row_ra
}
// get row ranges by zone map of this column
RowRanges column_zone_map_row_ranges;
_segment->_column_readers[cid]->get_row_ranges_by_zone_map(_opts.conditions->get_column(cid),
column_delete_conditions[cid], _opts.stats, &column_zone_map_row_ranges);
RETURN_IF_ERROR(
_segment->_column_readers[cid]->get_row_ranges_by_zone_map(
_opts.conditions->get_column(cid),
column_delete_conditions[cid],
_opts.stats,
&column_zone_map_row_ranges));
// intersection different columns's row ranges to get final row ranges by zone map
RowRanges::ranges_intersection(origin_row_ranges, column_zone_map_row_ranges, &origin_row_ranges);
}

View File

@ -107,15 +107,12 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
ASSERT_TRUE(st.ok());
ColumnReaderOptions reader_opts;
ColumnReader reader(reader_opts, meta, num_rows, rfile.get());
st = reader.init();
std::unique_ptr<ColumnReader> reader;
st = ColumnReader::create(reader_opts, meta, num_rows, rfile.get(), &reader);
ASSERT_TRUE(st.ok());
ASSERT_EQ(reader._ordinal_index->num_pages(), reader._column_zone_map->get_column_zone_map().size());
ColumnIterator* iter = nullptr;
st = reader.new_iterator(&iter);
st = reader->new_iterator(&iter);
ASSERT_TRUE(st.ok());
ColumnIteratorOptions iter_opts;
OlapReaderStatistics stats;
@ -137,7 +134,7 @@ void test_nullable_data(uint8_t* src_data, uint8_t* src_is_null, int num_rows, s
int idx = 0;
while (true) {
size_t rows_read = 1024;
auto st = iter->next_batch(&rows_read, &col);
st = iter->next_batch(&rows_read, &col);
ASSERT_TRUE(st.ok());
for (int j = 0; j < rows_read; ++j) {
// LOG(INFO) << "is_null=" << is_null[j] << ", src_is_null[]=" << src_is_null[idx]

View File

@ -29,13 +29,15 @@ public:
void test_string(Field* field) {
ColumnZoneMapBuilder builder(field);
std::vector<std::string> values1 = {"aaaa", "bbbb", "cccc", "dddd", "eeee", "ffff"};
for (auto value : values1) {
builder.add((const uint8_t*)&value, 1);
for (auto& value : values1) {
Slice slice(value);
builder.add((const uint8_t*)&slice, 1);
}
builder.flush();
std::vector<std::string> values2 = {"aaaaa", "bbbbb", "ccccc", "ddddd", "eeeee", "fffff"};
for (auto value : values2) {
builder.add((const uint8_t*)&value, 1);
for (auto& value : values2) {
Slice slice(value);
builder.add((const uint8_t*)&slice, 1);
}
builder.add(nullptr, 1);
builder.flush();

View File

@ -83,8 +83,9 @@ TabletColumn create_varchar_key(int32_t id, bool is_nullable = true) {
void set_column_value_by_type(FieldType fieldType, int src, char* target, MemPool* pool, size_t _length = 0) {
if (fieldType == OLAP_FIELD_TYPE_CHAR) {
char* src_value = &std::to_string(src)[0];
int src_len = strlen(src_value);
std::string s = std::to_string(src);
char* src_value = &s[0];
int src_len = s.size();
auto* dest_slice = (Slice*)target;
dest_slice->size = _length;
@ -92,13 +93,14 @@ void set_column_value_by_type(FieldType fieldType, int src, char* target, MemPoo
memcpy(dest_slice->data, src_value, src_len);
memset(dest_slice->data + src_len, 0, dest_slice->size - src_len);
} else if (fieldType == OLAP_FIELD_TYPE_VARCHAR) {
char* src_value = &std::to_string(src)[0];
int src_len = strlen(src_value);
std::string s = std::to_string(src);
char* src_value = &s[0];
int src_len = s.size();
auto* dest_slice = (Slice*)target;
dest_slice->size = src_len;
dest_slice->data = (char*)pool->allocate(src_len);
std::memcpy(dest_slice->data, src_value, src_len);
memcpy(dest_slice->data, src_value, src_len);
} else {
*(int*)target = src;
}