[improvement] improvement for light weight schema change (#10860)

* improvement for dynamic schema
not use schema as lru cache key any more.
load segment just use the rowset's original schema not the current read schema.
generate column reader and column iterator using the original schema, using the read schema if it is a new column.
using column unique id as key instead of column ordinals.
Co-authored-by: yiguolei <yiguolei@gmail.com>
This commit is contained in:
yiguolei
2022-07-18 17:53:31 +08:00
committed by GitHub
parent ba04c983ae
commit a2ed4b5c78
11 changed files with 75 additions and 47 deletions

View File

@ -23,6 +23,7 @@
#include "olap/block_column_predicate.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
#include "olap/tablet_schema.h"
#include "vec/core/block.h"
namespace doris {
@ -83,6 +84,8 @@ public:
OlapReaderStatistics* stats = nullptr;
bool use_page_cache = false;
int block_row_max = 4096;
const TabletSchema* tablet_schema = nullptr;
};
// Used to read data in RowBlockV2 one by one

View File

@ -69,16 +69,15 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
return Status::OK();
}
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
const TabletSchema* read_tablet_schema) {
Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) {
auto fs = _rowset_meta->fs();
if (!fs) {
if (!fs || _schema == nullptr) {
return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
}
for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
auto seg_path = segment_file_path(seg_id);
std::shared_ptr<segment_v2::Segment> segment;
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, read_tablet_schema, &segment);
auto s = segment_v2::Segment::open(fs, seg_path, seg_id, _schema, &segment);
if (!s.ok()) {
LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset "
<< unique_id() << " : " << s.to_string();

View File

@ -70,8 +70,7 @@ public:
bool check_file_exist() override;
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments,
const TabletSchema* read_tablet_schema);
Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
protected:
BetaRowset(const TabletSchema* schema, const std::string& tablet_path,

View File

@ -83,10 +83,11 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
}
}
read_options.use_page_cache = read_context->use_page_cache;
read_options.tablet_schema = read_context->tablet_schema;
// load segments
RETURN_NOT_OK(SegmentLoader::instance()->load_segments(
_rowset, &_segment_cache_handle, read_context->tablet_schema,
_rowset, &_segment_cache_handle,
read_context->reader_type == ReaderType::READER_QUERY));
// create iterator for each segment

View File

@ -70,12 +70,12 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
// trying to prune the current segment by segment-level zone map
if (read_options.conditions != nullptr) {
for (auto& column_condition : read_options.conditions->columns()) {
int32_t column_id = column_condition.first;
if (_column_readers[column_id] == nullptr ||
!_column_readers[column_id]->has_zone_map()) {
int32_t column_unique_id = _tablet_schema.column(column_condition.first).unique_id();
if (_column_readers.count(column_unique_id) < 1 ||
!_column_readers.at(column_unique_id)->has_zone_map()) {
continue;
}
if (!_column_readers[column_id]->match_condition(column_condition.second)) {
if (!_column_readers.at(column_unique_id)->match_condition(column_condition.second)) {
// any condition not satisfied, return.
iter->reset(new EmptySegmentIterator(schema));
read_options.stats->filtered_segment_number++;
@ -168,9 +168,8 @@ Status Segment::_create_column_readers() {
_column_id_to_footer_ordinal.emplace(column_pb.unique_id(), ordinal);
}
_column_readers.resize(_tablet_schema.columns().size());
for (uint32_t ordinal = 0; ordinal < _tablet_schema.num_columns(); ++ordinal) {
auto& column = _tablet_schema.columns()[ordinal];
auto& column = _tablet_schema.column(ordinal);
auto iter = _column_id_to_footer_ordinal.find(column.unique_id());
if (iter == _column_id_to_footer_ordinal.end()) {
continue;
@ -181,14 +180,20 @@ Status Segment::_create_column_readers() {
std::unique_ptr<ColumnReader> reader;
RETURN_IF_ERROR(ColumnReader::create(opts, _footer.columns(iter->second),
_footer.num_rows(), _file_reader, &reader));
_column_readers[ordinal] = std::move(reader);
_column_readers.emplace(column.unique_id(), std::move(reader));
}
return Status::OK();
}
Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
if (_column_readers[cid] == nullptr) {
const TabletColumn& tablet_column = _tablet_schema.column(cid);
// Not use cid anymore, for example original table schema is colA int, then user do following actions
// 1.add column b
// 2. drop column b
// 3. add column c
// in the new schema column c's cid == 2
// but in the old schema column b's cid == 2
// but they are not the same column
Status Segment::new_column_iterator(const TabletColumn& tablet_column, ColumnIterator** iter) {
if (_column_readers.count(tablet_column.unique_id()) < 1) {
if (!tablet_column.has_default_value() && !tablet_column.is_nullable()) {
return Status::InternalError("invalid nonexistent column without default value.");
}
@ -204,12 +209,15 @@ Status Segment::new_column_iterator(uint32_t cid, ColumnIterator** iter) {
*iter = default_value_iter.release();
return Status::OK();
}
return _column_readers[cid]->new_iterator(iter);
return _column_readers.at(tablet_column.unique_id())->new_iterator(iter);
}
Status Segment::new_bitmap_index_iterator(uint32_t cid, BitmapIndexIterator** iter) {
if (_column_readers[cid] != nullptr && _column_readers[cid]->has_bitmap_index()) {
return _column_readers[cid]->new_bitmap_index_iterator(iter);
Status Segment::new_bitmap_index_iterator(const TabletColumn& tablet_column,
BitmapIndexIterator** iter) {
auto col_unique_id = tablet_column.unique_id();
if (_column_readers.count(col_unique_id) > 0 &&
_column_readers.at(col_unique_id)->has_bitmap_index()) {
return _column_readers.at(col_unique_id)->new_bitmap_index_iterator(iter);
}
return Status::OK();
}

View File

@ -72,9 +72,9 @@ public:
uint32_t num_rows() const { return _footer.num_rows(); }
Status new_column_iterator(uint32_t cid, ColumnIterator** iter);
Status new_column_iterator(const TabletColumn& tablet_column, ColumnIterator** iter);
Status new_bitmap_index_iterator(uint32_t cid, BitmapIndexIterator** iter);
Status new_bitmap_index_iterator(const TabletColumn& tablet_column, BitmapIndexIterator** iter);
size_t num_short_keys() const { return _tablet_schema.num_short_key_columns(); }
@ -133,10 +133,11 @@ private:
// with an old schema.
std::unordered_map<uint32_t, uint32_t> _column_id_to_footer_ordinal;
// map column unique id ---> column reader
// ColumnReader for each column in TabletSchema. If ColumnReader is nullptr,
// This means that this segment has no data for that column, which may be added
// after this segment is generated.
std::vector<std::unique_ptr<ColumnReader>> _column_readers;
std::map<int32_t, std::unique_ptr<ColumnReader>> _column_readers;
// used to guarantee that short key index will be loaded at most once in a thread-safe way
DorisCallOnce<Status> _load_index_once;

View File

@ -216,7 +216,8 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
// create used column iterator
for (auto cid : _seek_schema->column_ids()) {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(cid, &_column_iterators[cid]));
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[cid]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.file_reader = _file_reader.get();
@ -344,7 +345,8 @@ Status SegmentIterator::_init_return_column_iterators() {
}
for (auto cid : _schema.column_ids()) {
if (_column_iterators[cid] == nullptr) {
RETURN_IF_ERROR(_segment->new_column_iterator(cid, &_column_iterators[cid]));
RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
&_column_iterators[cid]));
ColumnIteratorOptions iter_opts;
iter_opts.stats = _opts.stats;
iter_opts.use_page_cache = _opts.use_page_cache;
@ -361,8 +363,8 @@ Status SegmentIterator::_init_bitmap_index_iterators() {
}
for (auto cid : _schema.column_ids()) {
if (_bitmap_index_iterators[cid] == nullptr) {
RETURN_IF_ERROR(
_segment->new_bitmap_index_iterator(cid, &_bitmap_index_iterators[cid]));
RETURN_IF_ERROR(_segment->new_bitmap_index_iterator(_opts.tablet_schema->column(cid),
&_bitmap_index_iterators[cid]));
}
}
return Status::OK();

View File

@ -59,17 +59,16 @@ void SegmentLoader::_insert(const SegmentLoader::CacheKey& key, SegmentLoader::C
}
Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle,
const TabletSchema* read_tablet_schema, bool use_cache) {
SegmentLoader::CacheKey cache_key(rowset->rowset_id(), *read_tablet_schema);
if (use_cache && _lookup(cache_key, cache_handle)) {
SegmentCacheHandle* cache_handle, bool use_cache) {
SegmentLoader::CacheKey cache_key(rowset->rowset_id());
if (_lookup(cache_key, cache_handle)) {
cache_handle->owned = false;
return Status::OK();
}
cache_handle->owned = !use_cache;
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_NOT_OK(rowset->load_segments(&segments, read_tablet_schema));
RETURN_NOT_OK(rowset->load_segments(&segments));
if (use_cache) {
// memory of SegmentLoader::CacheValue will be handled by SegmentLoader

View File

@ -49,17 +49,11 @@ class SegmentLoader {
public:
// The cache key or segment lru cache
struct CacheKey {
CacheKey(RowsetId rowset_id_, const TabletSchema& tablet_schema)
: rowset_id(rowset_id_), tablet_schema(tablet_schema) {}
CacheKey(RowsetId rowset_id_) : rowset_id(rowset_id_) {}
RowsetId rowset_id;
TabletSchema tablet_schema;
// Encode to a flat binary which can be used as LRUCache's key
std::string encode() const {
TabletSchemaPB tablet_schema_pb;
tablet_schema.to_schema_pb(&tablet_schema_pb);
return rowset_id.to_string() + tablet_schema_pb.SerializeAsString();
}
std::string encode() const { return rowset_id.to_string(); }
};
// The cache value of segment lru cache.
@ -89,7 +83,7 @@ public:
// Load segments of "rowset", return the "cache_handle" which contains segments.
// If use_cache is true, it will be loaded from _cache.
Status load_segments(const BetaRowsetSharedPtr& rowset, SegmentCacheHandle* cache_handle,
const TabletSchema* read_tablet_schema, bool use_cache = false);
bool use_cache = false);
// Try to prune the segment cache if expired.
Status prune();

View File

@ -432,8 +432,6 @@ TEST_F(BetaRowsetTest, ReadTest) {
std::make_shared<io::S3FileSystem>(properties, "bucket", "test prefix", resource_id);
Aws::SDKOptions aws_options = Aws::SDKOptions {};
Aws::InitAPI(aws_options);
TabletSchema dummy_schema;
// failed to head object
{
Aws::Auth::AWSCredentials aws_cred("ak", "sk");
@ -447,7 +445,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}
@ -462,7 +460,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}
@ -477,7 +475,7 @@ TEST_F(BetaRowsetTest, ReadTest) {
rowset.rowset_meta()->set_fs(fs);
std::vector<segment_v2::SegmentSharedPtr> segments;
Status st = rowset.load_segments(&segments, &dummy_schema);
Status st = rowset.load_segments(&segments);
ASSERT_FALSE(st.ok());
}

View File

@ -168,6 +168,7 @@ TEST_F(SegmentReaderWriterTest, normal) {
{
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -223,6 +224,7 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), true);
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -249,6 +251,7 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -278,6 +281,7 @@ TEST_F(SegmentReaderWriterTest, normal) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -315,6 +319,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {
StorageReadOptions read_opts;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());
@ -339,6 +344,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {
StorageReadOptions read_opts;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());
@ -359,6 +365,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {
OlapReaderStatistics stats;
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());
@ -391,6 +398,7 @@ TEST_F(SegmentReaderWriterTest, LazyMaterialization) {
StorageReadOptions read_opts;
read_opts.column_predicates = predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(read_schema, read_opts, &iter).ok());
@ -452,6 +460,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
@ -475,6 +484,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
@ -534,6 +544,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
read_opts.conditions = conditions.get();
read_opts.delete_conditions.push_back(delete_conditions.get());
@ -575,6 +586,7 @@ TEST_F(SegmentReaderWriterTest, TestIndex) {
{
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
TCondition condition;
condition.__set_column_name("2");
condition.__set_condition_op("=");
@ -674,6 +686,7 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
{
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &query_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -724,6 +737,7 @@ TEST_F(SegmentReaderWriterTest, TestDefaultValueColumn) {
{
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = &query_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -826,6 +840,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
{
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = tablet_schema.get();
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -878,6 +893,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = tablet_schema.get();
read_opts.key_ranges.emplace_back(lower_bound.get(), false, nullptr, false);
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -911,6 +927,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = tablet_schema.get();
read_opts.key_ranges.emplace_back(lower_bound.get(), false, upper_bound.get(), false);
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -934,6 +951,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = tablet_schema.get();
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
@ -991,6 +1009,7 @@ TEST_F(SegmentReaderWriterTest, TestStringDict) {
StorageReadOptions read_opts;
read_opts.stats = &stats;
read_opts.tablet_schema = tablet_schema.get();
read_opts.conditions = conditions.get();
std::unique_ptr<RowwiseIterator> iter;
@ -1029,6 +1048,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
OlapReaderStatistics stats;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -1051,6 +1071,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
OlapReaderStatistics stats;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -1073,6 +1094,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
OlapReaderStatistics stats;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -1097,6 +1119,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
OlapReaderStatistics stats;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());
@ -1120,6 +1143,7 @@ TEST_F(SegmentReaderWriterTest, TestBitmapPredicate) {
OlapReaderStatistics stats;
read_opts.column_predicates = column_predicates;
read_opts.stats = &stats;
read_opts.tablet_schema = &tablet_schema;
std::unique_ptr<RowwiseIterator> iter;
ASSERT_TRUE(segment->new_iterator(schema, read_opts, &iter).ok());