[refactor](storage) VGenericIterator to reuse Schema (#7858)

1. reuse Schema to avoid copying, because clone Schema will generate a lot of sub Field object
2. call interface provided by Block to reduce code lines
This commit is contained in:
zuochunwei
2022-02-09 13:06:03 +08:00
committed by GitHub
parent 0553ce2944
commit db20e1f323
6 changed files with 29 additions and 46 deletions

View File

@ -111,7 +111,7 @@ public:
private:
Status _copy_data_to_column(int cid, vectorized::MutableColumnPtr& mutable_column_ptr);
Schema _schema;
const Schema& _schema;
size_t _capacity;
// _column_vector_batches[cid] == null if cid is not in `_schema`.
// memory are not allocated from `_pool` because we don't wan't to reallocate them in clear()

View File

@ -55,7 +55,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
_stats = _context->stats;
}
// SegmentIterator will load seek columns on demand
Schema schema(_context->tablet_schema->columns(), *(_context->return_columns));
_schema = std::make_unique<Schema>(_context->tablet_schema->columns(), *(_context->return_columns));
// convert RowsetReaderContext to StorageReadOptions
StorageReadOptions read_options;
@ -102,7 +102,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, _parent_tracker, &iter);
auto s = seg_ptr->new_iterator(*_schema, read_options, _parent_tracker, &iter);
if (!s.ok()) {
LOG(WARNING) << "failed to create iterator[" << seg_ptr->id() << "]: " << s.to_string();
return OLAP_ERR_ROWSET_READER_INIT;
@ -131,7 +131,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
_iterator.reset(final_iterator);
// init input block
_input_block.reset(new RowBlockV2(schema,
_input_block.reset(new RowBlockV2(*_schema,
std::min(1024, read_context->batch_size), _parent_tracker));
if (!read_context->is_vec) {

View File

@ -58,6 +58,7 @@ public:
RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
private:
std::unique_ptr<Schema> _schema;
RowsetReaderContext* _context;
BetaRowsetSharedPtr _rowset;

View File

@ -35,7 +35,7 @@ public:
Status next_batch(vectorized::Block* block) override;
private:
Schema _schema;
const Schema& _schema;
};
} // namespace segment_v2

View File

@ -103,8 +103,7 @@ private:
class BitmapRangeIterator;
std::shared_ptr<Segment> _segment;
// TODO(zc): rethink if we need copy it
Schema _schema;
const Schema& _schema;
// _column_iterators.size() == _schema.num_columns()
// _column_iterators[cid] == nullptr if cid is not in _schema
std::vector<ColumnIterator*> _column_iterators;

View File

@ -100,7 +100,7 @@ public:
const Schema& schema() const override { return _schema; }
private:
Schema _schema;
const Schema& _schema;
size_t _num_rows;
size_t _rows_returned;
};
@ -136,12 +136,19 @@ public:
{
if (!_block) {
const Schema& schema = _iter->schema();
for (auto &column_desc : schema.columns()) {
const auto& column_ids = schema.column_ids();
for (size_t i = 0; i < schema.num_column_ids(); ++i) {
auto column_desc = schema.column(column_ids[i]);
auto data_type = Schema::get_data_type_ptr(column_desc->type());
if (data_type == nullptr) {
return Status::RuntimeError("invalid data type");
}
_block.insert(ColumnWithTypeAndName(data_type->create_column(), data_type, column_desc->name()));
if (column_desc->is_nullable()) {
data_type = std::make_shared<vectorized::DataTypeNullable>(std::move(data_type));
}
auto column = data_type->create_column();
column->reserve(_block_row_max);
_block.insert(ColumnWithTypeAndName(std::move(column), data_type, column_desc->name()));
}
} else {
_block.clear_column_data();
@ -152,43 +159,17 @@ public:
// Initialize this context and will prepare data for current_row()
Status init(const StorageReadOptions& opts);
int compare_row(const VMergeIteratorContext& rhs) const {
bool compare(const VMergeIteratorContext& rhs) const {
const Schema& schema = _iter->schema();
int num = schema.num_key_columns();
for (uint32_t cid = 0; cid < num; ++cid) {
#if 0
auto name = schema.column(cid)->name();
auto l_col = this->_block.get_by_name(name);
auto r_col = rhs._block.get_by_name(name);
#else
//because the columns of block will be inserted by cid asc order
//so no need to get column by get_by_name()
auto l_col = this->_block.get_by_position(cid);
auto r_col = rhs._block.get_by_position(cid);
#endif
auto l_cp = l_col.column;
auto r_cp = r_col.column;
auto res = l_cp->compare_at(_index_in_block, rhs._index_in_block, *r_cp, -1);
if (res) {
return res;
}
}
return 0;
}
bool compare(const VMergeIteratorContext& rhs) const {
int cmp_res = this->compare_row(rhs);
int cmp_res = this->_block.compare_at(_index_in_block, rhs._index_in_block, num, rhs._block, -1);
if (cmp_res != 0) {
return cmp_res > 0;
}
return this->data_id() < rhs.data_id();
}
void copy_row_to(vectorized::Block* block) {
void copy_row(vectorized::Block* block) {
vectorized::Block& src = _block;
vectorized::Block& dst = *block;
@ -230,9 +211,11 @@ private:
bool _valid = false;
size_t _index_in_block = -1;
int _block_row_max = 4096;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
_block_row_max = opts.block_row_max;
RETURN_IF_ERROR(_iter->init(opts));
RETURN_IF_ERROR(block_reset());
RETURN_IF_ERROR(_load_next_block());
@ -246,7 +229,7 @@ Status VMergeIteratorContext::advance() {
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
if (_index_in_block < _block.rows()) {
if (LIKELY(_index_in_block < _block.rows())) {
return Status::OK();
}
// current batch has no data, load next batch
@ -299,7 +282,7 @@ private:
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
std::unique_ptr<Schema> _schema;
const Schema* _schema = nullptr;
struct VMergeContextComparator {
bool operator()(const VMergeIteratorContext* lhs, const VMergeIteratorContext* rhs) const {
@ -320,10 +303,10 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
_schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
_schema = &(*_origin_iters.begin())->schema();
for (auto iter : _origin_iters) {
std::unique_ptr<VMergeIteratorContext> ctx(new VMergeIteratorContext(iter));
auto ctx = std::make_unique<VMergeIteratorContext>(iter);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@ -347,7 +330,7 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
_merge_heap.pop();
// copy current row to block
ctx->copy_row_to(block);
ctx->copy_row(block);
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
@ -383,7 +366,7 @@ public:
const Schema& schema() const override { return *_schema; }
private:
std::unique_ptr<Schema> _schema;
const Schema* _schema = nullptr;
RowwiseIterator* _cur_iter = nullptr;
std::deque<RowwiseIterator*> _origin_iters;
};
@ -396,8 +379,8 @@ Status VUnionIterator::init(const StorageReadOptions& opts) {
for (auto iter : _origin_iters) {
RETURN_IF_ERROR(iter->init(opts));
}
_schema.reset(new Schema((*(_origin_iters.begin()))->schema()));
_cur_iter = *(_origin_iters.begin());
_schema = &_cur_iter->schema();
return Status::OK();
}