diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index 3cc8da46ca..317df0de5a 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -126,6 +126,8 @@ public: int32_t tablet_id = 0; }; +class RowwiseIterator; +using RowwiseIteratorUPtr = std::unique_ptr; class RowwiseIterator { public: RowwiseIterator() = default; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index e201d49e68..ba828b01be 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -133,10 +133,8 @@ bool TabletReader::_optimize_for_single_rowset( return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset; } -Status TabletReader::_capture_rs_readers(const ReaderParams& read_params, - std::vector* valid_rs_readers) { - const std::vector* rs_readers = &read_params.rs_readers; - if (rs_readers->empty()) { +Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) { + if (read_params.rs_readers.empty()) { return Status::InternalError("fail to acquire data sources. tablet={}", _tablet->full_name()); } @@ -229,8 +227,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.remaining_vconjunct_root = read_params.remaining_vconjunct_root; _reader_context.output_columns = &read_params.output_columns; - *valid_rs_readers = *rs_readers; - return Status::OK(); } diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h index 4168341738..8c0dec30e7 100644 --- a/be/src/olap/reader.h +++ b/be/src/olap/reader.h @@ -183,8 +183,7 @@ protected: Status _init_params(const ReaderParams& read_params); - Status _capture_rs_readers(const ReaderParams& read_params, - std::vector* valid_rs_readers); + Status _capture_rs_readers(const ReaderParams& read_params); bool _optimize_for_single_rowset(const std::vector& rs_readers); diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 2ba9e2fe85..b8bb29698e 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -50,7 +50,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* profile) { } Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context, - std::vector* out_iters, + std::vector* out_iters, bool use_cache) { RETURN_NOT_OK(_rowset->load()); _context = read_context; @@ -172,7 +172,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context should_use_cache)); // create iterator for each segment - std::vector> seg_iterators; for (auto& seg_ptr : _segment_cache_handle.get_segments()) { std::unique_ptr iter; auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter); @@ -183,29 +182,22 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context if (iter->empty()) { continue; } - seg_iterators.push_back(std::move(iter)); - } - - std::vector iterators; - for (auto& owned_it : seg_iterators) { - auto st = owned_it->init(_read_options); + auto st = iter->init(_read_options); if (!st.ok()) { LOG(WARNING) << "failed to init iterator: " << st.to_string(); return Status::Error(); } - // transfer ownership of segment iterator to `_iterator` - out_iters->push_back(owned_it.release()); + out_iters->push_back(std::move(iter)); } return Status::OK(); } Status BetaRowsetReader::init(RowsetReaderContext* read_context) { _context = read_context; - std::vector iterators; + std::vector iterators; RETURN_NOT_OK(get_segment_iterators(_context, &iterators)); // merge or union segment iterator - RowwiseIterator* final_iterator; if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { auto sequence_loc = -1; if (read_context->sequence_id_idx != -1) { @@ -216,23 +208,23 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) { } } } - final_iterator = vectorized::new_merge_iterator( - iterators, sequence_loc, read_context->is_unique, + _iterator = vectorized::new_merge_iterator( + std::move(iterators), sequence_loc, read_context->is_unique, read_context->read_orderby_key_reverse, read_context->merged_rows); } else { if (read_context->read_orderby_key_reverse) { // reverse iterators to read backward for ORDER BY key DESC std::reverse(iterators.begin(), iterators.end()); } - final_iterator = vectorized::new_union_iterator(iterators); + _iterator = vectorized::new_union_iterator(std::move(iterators)); } - auto s = final_iterator->init(_read_options); + auto s = _iterator->init(_read_options); if (!s.ok()) { LOG(WARNING) << "failed to init iterator: " << s.to_string(); + _iterator.reset(); return Status::Error(); } - _iterator.reset(final_iterator); return Status::OK(); } diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h index e5223e5241..eba2c2885c 100644 --- a/be/src/olap/rowset/beta_rowset_reader.h +++ b/be/src/olap/rowset/beta_rowset_reader.h @@ -34,7 +34,7 @@ public: Status init(RowsetReaderContext* read_context) override; Status get_segment_iterators(RowsetReaderContext* read_context, - std::vector* out_iters, + std::vector* out_iters, bool use_cache = false) override; void reset_read_options() override; Status next_block(vectorized::Block* block) override; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 7c19b16a18..24602f36ec 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -116,7 +116,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) { return _add_block(block, &_segment_writer); } -vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader( +RowwiseIteratorUPtr BetaRowsetWriter::_get_segcompaction_reader( SegCompactionCandidatesSharedPtr segments, std::shared_ptr schema, OlapReaderStatistics* stat, uint64_t* merged_row_stat) { StorageReadOptions read_options; @@ -133,26 +133,18 @@ vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader( } seg_iterators.push_back(std::move(iter)); } - std::vector iterators; - for (auto& owned_it : seg_iterators) { - // transfer ownership - iterators.push_back(owned_it.release()); - } bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS); bool is_reverse = false; - auto merge_itr = - vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat); + auto merge_itr = vectorized::new_merge_iterator(std::move(seg_iterators), -1, is_unique, + is_reverse, merged_row_stat); DCHECK(merge_itr); auto s = merge_itr->init(read_options); if (!s.ok()) { LOG(WARNING) << "failed to init iterator: " << s.to_string(); - for (auto& itr : iterators) { - delete itr; - } return nullptr; } - return (vectorized::VMergeIterator*)merge_itr; + return merge_itr; } std::unique_ptr BetaRowsetWriter::_create_segcompaction_writer( @@ -283,14 +275,11 @@ Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr s _context.tablet_schema->columns().size()); std::unique_ptr stat(new OlapReaderStatistics()); uint64_t merged_row_stat = 0; - vectorized::VMergeIterator* reader = - _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat); - if (UNLIKELY(reader == nullptr)) { + auto reader_ptr = _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat); + if (UNLIKELY(reader_ptr == nullptr)) { LOG(WARNING) << "failed to get segcompaction reader"; return Status::Error(); } - std::unique_ptr reader_ptr; - reader_ptr.reset(reader); auto writer = _create_segcompaction_writer(begin, end); if (UNLIKELY(writer == nullptr)) { LOG(WARNING) << "failed to get segcompaction writer"; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 7f800b3199..f0d29fba30 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -107,10 +107,10 @@ private: void _build_rowset_meta(std::shared_ptr rowset_meta); Status _segcompaction_if_necessary(); Status _segcompaction_ramaining_if_necessary(); - vectorized::VMergeIterator* _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments, - std::shared_ptr schema, - OlapReaderStatistics* stat, - uint64_t* merged_row_stat); + RowwiseIteratorUPtr _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments, + std::shared_ptr schema, + OlapReaderStatistics* stat, + uint64_t* merged_row_stat); std::unique_ptr _create_segcompaction_writer(uint64_t begin, uint64_t end); Status _delete_original_segments(uint32_t begin, uint32_t end); diff --git a/be/src/olap/rowset/rowset_reader.h b/be/src/olap/rowset/rowset_reader.h index 4d979c6e21..795f841b0a 100644 --- a/be/src/olap/rowset/rowset_reader.h +++ b/be/src/olap/rowset/rowset_reader.h @@ -44,7 +44,7 @@ public: virtual Status init(RowsetReaderContext* read_context) = 0; virtual Status get_segment_iterators(RowsetReaderContext* read_context, - std::vector* out_iters, + std::vector* out_iters, bool use_cache = false) = 0; virtual void reset_read_options() = 0; diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp index 1e81c86e08..77246995c5 100644 --- a/be/src/olap/rowset/segment_v2/segment.cpp +++ b/be/src/olap/rowset/segment_v2/segment.cpp @@ -135,8 +135,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea } else { iter->reset(new SegmentIterator(this->shared_from_this(), schema)); } - iter->get()->init(read_options); - return Status::OK(); + return iter->get()->init(read_options); } Status Segment::_parse_footer() { diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 78e271625e..5b95ac9335 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -58,10 +58,8 @@ bool BlockReader::_rowsets_overlapping(const std::vector& } return false; } -Status BlockReader::_init_collect_iter(const ReaderParams& read_params, - std::vector* valid_rs_readers) { - std::vector rs_readers; - auto res = _capture_rs_readers(read_params, &rs_readers); +Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { + auto res = _capture_rs_readers(read_params); if (!res.ok()) { LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res << ", tablet_id:" << read_params.tablet->tablet_id() @@ -71,13 +69,14 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params, return res; } // check if rowsets are noneoverlapping - _is_rowsets_overlapping = _rowsets_overlapping(rs_readers); + _is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers); _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, read_params.read_orderby_key_reverse); _reader_context.is_vec = true; _reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt; - for (auto& rs_reader : rs_readers) { + std::vector valid_rs_readers; + for (auto& rs_reader : read_params.rs_readers) { // _vcollect_iter.topn_next() will init rs_reader by itself if (!_vcollect_iter.use_topn_next()) { RETURN_NOT_OK(rs_reader->init(&_reader_context)); @@ -88,11 +87,11 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params, return res; } if (res.ok()) { - valid_rs_readers->push_back(rs_reader); + valid_rs_readers.push_back(rs_reader); } } - RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers)); + RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers)); // _vcollect_iter.topn_next() can not use current_row if (!_vcollect_iter.use_topn_next()) { auto status = _vcollect_iter.current_row(&_next_row); @@ -157,8 +156,7 @@ Status BlockReader::init(const ReaderParams& read_params) { } } - std::vector rs_readers; - auto status = _init_collect_iter(read_params, &rs_readers); + auto status = _init_collect_iter(read_params); if (!status.ok()) { return status; } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 5b374f9859..4669622c65 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -62,8 +62,7 @@ private: // to minimize the comparison time in merge heap. Status _unique_key_next_block(Block* block, bool* eof); - Status _init_collect_iter(const ReaderParams& read_params, - std::vector* valid_rs_readers); + Status _init_collect_iter(const ReaderParams& read_params); void _init_agg_state(const ReaderParams& read_params); diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 13eb7831ee..e33615393e 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -36,11 +36,10 @@ VerticalBlockReader::~VerticalBlockReader() { } Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_params, - std::vector* segment_iters, + std::vector* segment_iters, std::vector* iterator_init_flag, std::vector* rowset_ids) { - std::vector rs_readers; - auto res = _capture_rs_readers(read_params, &rs_readers); + auto res = _capture_rs_readers(read_params); if (!res.ok()) { LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res << ", tablet_id:" << read_params.tablet->tablet_id() @@ -51,7 +50,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para } _reader_context.is_vec = true; _reader_context.is_vertical_compaction = true; - for (auto& rs_reader : rs_readers) { + for (auto& rs_reader : read_params.rs_readers) { // segment iterator will be inited here // In vertical compaction, every group will load segment so we should cache // segment to avoid tot many s3 head request @@ -84,7 +83,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) { // get segment iterators - std::vector segment_iters; + std::vector segment_iters; std::vector iterator_init_flag; std::vector rowset_ids; RETURN_IF_ERROR( @@ -99,11 +98,11 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) seq_col_idx = read_params.tablet->tablet_schema()->sequence_col_idx(); } _vcollect_iter = new_vertical_heap_merge_iterator( - segment_iters, iterator_init_flag, rowset_ids, ori_return_col_size, + std::move(segment_iters), iterator_init_flag, rowset_ids, ori_return_col_size, read_params.tablet->keys_type(), seq_col_idx, _row_sources_buffer); } else { - _vcollect_iter = new_vertical_mask_merge_iterator(segment_iters, ori_return_col_size, - _row_sources_buffer); + _vcollect_iter = new_vertical_mask_merge_iterator(std::move(segment_iters), + ori_return_col_size, _row_sources_buffer); } // init collect iterator StorageReadOptions opts; diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index b259e5bebe..1cd8161c4c 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -65,7 +65,7 @@ private: Status _init_collect_iter(const ReaderParams& read_params); Status _get_segment_iterators(const ReaderParams& read_params, - std::vector* segment_iters, + std::vector* segment_iters, std::vector* iterator_init_flag, std::vector* rowset_ids); diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 1f86ca81a2..c715ceb78b 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -463,9 +463,9 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { // will not be pushed into heap, we should init next one util we find a valid iter // so this rowset can work in heap bool pre_iter_invalid = false; - for (auto iter : _origin_iters) { + for (auto& iter : _origin_iters) { VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext( - iter, _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx); + std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx); _ori_iter_ctx.push_back(ctx); if (_iterator_init_flags[seg_order] || pre_iter_invalid) { RETURN_IF_ERROR(ctx->init(opts)); @@ -600,9 +600,9 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { _opts = opts; RowsetId rs_id; - for (auto iter : _origin_iters) { - auto ctx = std::make_unique(iter, rs_id, _ori_return_cols, -1, - -1); + for (auto& iter : _origin_iters) { + auto ctx = std::make_unique(std::move(iter), rs_id, + _ori_return_cols, -1, -1); _origin_iter_ctx.emplace_back(ctx.release()); } _origin_iters.clear(); @@ -613,7 +613,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { // interfaces to create vertical merge iterator std::shared_ptr new_vertical_heap_merge_iterator( - std::vector inputs, const std::vector& iterator_init_flag, + std::vector&& inputs, const std::vector& iterator_init_flag, const std::vector& rowset_ids, size_t ori_return_cols, KeysType keys_type, uint32_t seq_col_idx, RowSourcesBuffer* row_sources) { return std::make_shared(std::move(inputs), iterator_init_flag, @@ -622,9 +622,10 @@ std::shared_ptr new_vertical_heap_merge_iterator( } std::shared_ptr new_vertical_mask_merge_iterator( - const std::vector& inputs, size_t ori_return_cols, + std::vector&& inputs, size_t ori_return_cols, RowSourcesBuffer* row_sources) { - return std::make_shared(inputs, ori_return_cols, row_sources); + return std::make_shared(std::move(inputs), ori_return_cols, + row_sources); } } // namespace vectorized diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index 822000a2bc..a45b983288 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -129,24 +129,21 @@ private: // takes ownership of rowwise iterator class VerticalMergeIteratorContext { public: - VerticalMergeIteratorContext(RowwiseIterator* iter, RowsetId rowset_id, size_t ori_return_cols, - uint32_t order, uint32_t seq_col_idx) - : _iter(iter), + VerticalMergeIteratorContext(RowwiseIteratorUPtr&& iter, RowsetId rowset_id, + size_t ori_return_cols, uint32_t order, uint32_t seq_col_idx) + : _iter(std::move(iter)), _rowset_id(rowset_id), _ori_return_cols(ori_return_cols), _order(order), _seq_col_idx(seq_col_idx), - _num_key_columns(iter->schema().num_key_columns()) {} + _num_key_columns(_iter->schema().num_key_columns()) {} VerticalMergeIteratorContext(const VerticalMergeIteratorContext&) = delete; VerticalMergeIteratorContext(VerticalMergeIteratorContext&&) = delete; VerticalMergeIteratorContext& operator=(const VerticalMergeIteratorContext&) = delete; VerticalMergeIteratorContext& operator=(VerticalMergeIteratorContext&&) = delete; - ~VerticalMergeIteratorContext() { - delete _iter; - _iter = nullptr; - } + ~VerticalMergeIteratorContext() {} Status block_reset(const std::shared_ptr& block); Status init(const StorageReadOptions& opts); bool compare(const VerticalMergeIteratorContext& rhs) const; @@ -188,7 +185,7 @@ private: // Load next block into _block Status _load_next_block(); - RowwiseIterator* _iter; + RowwiseIteratorUPtr _iter; RowsetId _rowset_id; size_t _ori_return_cols = 0; @@ -219,7 +216,7 @@ private: class VerticalHeapMergeIterator : public RowwiseIterator { public: // VerticalMergeIterator takes the ownership of input iterators - VerticalHeapMergeIterator(std::vector iters, + VerticalHeapMergeIterator(std::vector&& iters, std::vector iterator_init_flags, std::vector rowset_ids, size_t ori_return_cols, KeysType keys_type, int32_t seq_col_idx, @@ -255,7 +252,7 @@ private: private: // It will be released after '_merge_heap' has been built. - std::vector _origin_iters; + std::vector _origin_iters; std::vector _iterator_init_flags; std::vector _rowset_ids; size_t _ori_return_cols; @@ -289,7 +286,7 @@ private: class VerticalMaskMergeIterator : public RowwiseIterator { public: // VerticalMaskMergeIterator takes the ownership of input iterators - VerticalMaskMergeIterator(std::vector iters, size_t ori_return_cols, + VerticalMaskMergeIterator(std::vector&& iters, size_t ori_return_cols, RowSourcesBuffer* row_sources_buf) : _origin_iters(std::move(iters)), _ori_return_cols(ori_return_cols), @@ -318,7 +315,7 @@ private: private: // released after build ctx - std::vector _origin_iters; + std::vector _origin_iters; size_t _ori_return_cols = 0; std::vector _origin_iter_ctx; @@ -332,12 +329,12 @@ private: // segment merge iterator std::shared_ptr new_vertical_heap_merge_iterator( - std::vector inputs, const std::vector& iterator_init_flag, + std::vector&& inputs, const std::vector& iterator_init_flag, const std::vector& rowset_ids, size_t _ori_return_cols, KeysType key_type, uint32_t seq_col_idx, RowSourcesBuffer* row_sources_buf); std::shared_ptr new_vertical_mask_merge_iterator( - const std::vector& inputs, size_t ori_return_cols, + std::vector&& inputs, size_t ori_return_cols, RowSourcesBuffer* row_sources_buf); } // namespace vectorized diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index db38ebe6c4..7e1d42b20f 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -317,12 +317,13 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } - _schema = &(*_origin_iters.begin())->schema(); + _schema = &(_origin_iters[0]->schema()); _record_rowids = opts.record_rowids; - for (auto iter : _origin_iters) { - auto ctx = std::make_unique( - iter, _sequence_id_idx, _is_unique, _is_reverse, opts.read_orderby_key_columns); + for (auto& iter : _origin_iters) { + auto ctx = std::make_unique(std::move(iter), _sequence_id_idx, + _is_unique, _is_reverse, + opts.read_orderby_key_columns); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; @@ -343,12 +344,9 @@ public: // Iterators' ownership it transferred to this class. // This class will delete all iterators when destructs // Client should not use iterators anymore. - VUnionIterator(std::vector& v) : _origin_iters(v.begin(), v.end()) {} + VUnionIterator(std::vector&& v) : _origin_iters(std::move(v)) {} - ~VUnionIterator() override { - std::for_each(_origin_iters.begin(), _origin_iters.end(), - std::default_delete()); - } + ~VUnionIterator() override {} Status init(const StorageReadOptions& opts) override; @@ -368,7 +366,7 @@ public: private: const Schema* _schema = nullptr; RowwiseIterator* _cur_iter = nullptr; - std::deque _origin_iters; + std::vector _origin_iters; }; Status VUnionIterator::init(const StorageReadOptions& opts) { @@ -376,10 +374,10 @@ Status VUnionIterator::init(const StorageReadOptions& opts) { return Status::OK(); } - for (auto iter : _origin_iters) { + for (auto& iter : _origin_iters) { RETURN_IF_ERROR(iter->init(opts)); } - _cur_iter = *(_origin_iters.begin()); + _cur_iter = _origin_iters.back().get(); _schema = &_cur_iter->schema(); return Status::OK(); } @@ -388,10 +386,9 @@ Status VUnionIterator::next_batch(Block* block) { while (_cur_iter != nullptr) { auto st = _cur_iter->next_batch(block); if (st.is()) { - delete _cur_iter; - _origin_iters.pop_front(); + _origin_iters.pop_back(); if (!_origin_iters.empty()) { - _cur_iter = *(_origin_iters.begin()); + _cur_iter = _origin_iters.back().get(); } else { _cur_iter = nullptr; } @@ -410,27 +407,29 @@ Status VUnionIterator::current_block_row_locations(std::vector* loc return _cur_iter->current_block_row_locations(locations); } -RowwiseIterator* new_merge_iterator(std::vector& inputs, int sequence_id_idx, - bool is_unique, bool is_reverse, uint64_t* merged_rows) { +RowwiseIteratorUPtr new_merge_iterator(std::vector&& inputs, + int sequence_id_idx, bool is_unique, bool is_reverse, + uint64_t* merged_rows) { if (inputs.size() == 1) { - return *(inputs.begin()); + return std::move(inputs[0]); } - return new VMergeIterator(inputs, sequence_id_idx, is_unique, is_reverse, merged_rows); + return std::make_unique(std::move(inputs), sequence_id_idx, is_unique, + is_reverse, merged_rows); } -RowwiseIterator* new_union_iterator(std::vector& inputs) { +RowwiseIteratorUPtr new_union_iterator(std::vector&& inputs) { if (inputs.size() == 1) { - return *(inputs.begin()); + return std::move(inputs[0]); } - return new VUnionIterator(inputs); + return std::make_unique(std::move(inputs)); } RowwiseIterator* new_vstatistics_iterator(std::shared_ptr segment, const Schema& schema) { return new VStatisticsIterator(segment, schema); } -RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows) { - return new VAutoIncrementIterator(schema, num_rows); +RowwiseIteratorUPtr new_auto_increment_iterator(const Schema& schema, size_t num_rows) { + return std::make_unique(schema, num_rows); } } // namespace vectorized diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index 03ddbee8bc..a59f65678c 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -65,14 +65,14 @@ private: // } class VMergeIteratorContext { public: - VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique, + VMergeIteratorContext(RowwiseIteratorUPtr&& iter, int sequence_id_idx, bool is_unique, bool is_reverse, std::vector* read_orderby_key_columns) - : _iter(iter), + : _iter(std::move(iter)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _is_reverse(is_reverse), - _num_columns(iter->schema().num_column_ids()), - _num_key_columns(iter->schema().num_key_columns()), + _num_columns(_iter->schema().num_column_ids()), + _num_key_columns(_iter->schema().num_key_columns()), _compare_columns(read_orderby_key_columns) {} VMergeIteratorContext(const VMergeIteratorContext&) = delete; @@ -80,10 +80,7 @@ public: VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete; VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete; - ~VMergeIteratorContext() { - delete _iter; - _iter = nullptr; - } + ~VMergeIteratorContext() {} Status block_reset(const std::shared_ptr& block); @@ -143,7 +140,7 @@ private: // Load next block into _block Status _load_next_block(); - RowwiseIterator* _iter; + RowwiseIteratorUPtr _iter; int _sequence_id_idx = -1; bool _is_unique = false; @@ -171,9 +168,9 @@ private: class VMergeIterator : public RowwiseIterator { public: // VMergeIterator takes the ownership of input iterators - VMergeIterator(std::vector& iters, int sequence_id_idx, bool is_unique, + VMergeIterator(std::vector&& iters, int sequence_id_idx, bool is_unique, bool is_reverse, uint64_t* merged_rows) - : _origin_iters(iters), + : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _is_reverse(is_reverse), @@ -204,7 +201,7 @@ public: bool update_profile(RuntimeProfile* profile) override { if (!_origin_iters.empty()) { - return (*_origin_iters.begin())->update_profile(profile); + return _origin_iters[0]->update_profile(profile); } return false; } @@ -277,7 +274,7 @@ private: } // It will be released after '_merge_heap' has been built. - std::vector _origin_iters; + std::vector _origin_iters; const Schema* _schema = nullptr; @@ -308,21 +305,21 @@ private: // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector& inputs, int sequence_id_idx, - bool is_unique, bool is_reverse, uint64_t* merged_rows); +RowwiseIteratorUPtr new_merge_iterator(std::vector&& inputs, + int sequence_id_idx, bool is_unique, bool is_reverse, + uint64_t* merged_rows); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. // -// Inputs iterators' ownership is taken by created union iterator. And client -// should delete returned iterator after usage. -RowwiseIterator* new_union_iterator(std::vector& inputs); +// Inputs iterators' ownership is taken by created union iterator. +RowwiseIteratorUPtr new_union_iterator(std::vector&& inputs); // Create an auto increment iterator which returns num_rows data in format of schema. // This class aims to be used in unit test. // // Client should delete returned iterator. -RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows); +RowwiseIteratorUPtr new_auto_increment_iterator(const Schema& schema, size_t num_rows); RowwiseIterator* new_vstatistics_iterator(std::shared_ptr segment, const Schema& schema); diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp b/be/test/vec/exec/vgeneric_iterators_test.cpp index f51a85b25c..90a755261c 100644 --- a/be/test/vec/exec/vgeneric_iterators_test.cpp +++ b/be/test/vec/exec/vgeneric_iterators_test.cpp @@ -86,19 +86,17 @@ TEST(VGenericIteratorsTest, AutoIncrement) { EXPECT_EQ(row_count + 2, (*c2)[i].get()); row_count++; } - - delete iter; } TEST(VGenericIteratorsTest, Union) { auto schema = create_schema(); - std::vector inputs; + std::vector inputs; inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300)); - auto iter = vectorized::new_union_iterator(inputs); + auto iter = vectorized::new_union_iterator(std::move(inputs)); StorageReadOptions opts; auto st = iter->init(opts); EXPECT_TRUE(st.ok()); @@ -117,34 +115,30 @@ TEST(VGenericIteratorsTest, Union) { auto c1 = block.get_by_position(1).column; auto c2 = block.get_by_position(2).column; - size_t row_count = 0; - for (size_t i = 0; i < block.rows(); ++i) { - size_t base_value = row_count; - if (row_count >= 300) { + for (int i = 0; i < block.rows(); ++i) { + size_t base_value = i; + if (i >= 500) { + base_value -= 500; + } else if (i >= 300) { base_value -= 300; - } else if (row_count >= 100) { - base_value -= 100; } EXPECT_EQ(base_value, (*c0)[i].get()); EXPECT_EQ(base_value + 1, (*c1)[i].get()); EXPECT_EQ(base_value + 2, (*c2)[i].get()); - row_count++; } - - delete iter; } TEST(VGenericIteratorsTest, MergeAgg) { EXPECT_TRUE(1); auto schema = create_schema(); - std::vector inputs; + std::vector inputs; inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300)); - auto iter = vectorized::new_merge_iterator(inputs, -1, false, false, nullptr); + auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false, false, nullptr); StorageReadOptions opts; auto st = iter->init(opts); EXPECT_TRUE(st.ok()); @@ -180,20 +174,18 @@ TEST(VGenericIteratorsTest, MergeAgg) { EXPECT_EQ(base_value + 2, (*c2)[i].get()); row_count++; } - - delete iter; } TEST(VGenericIteratorsTest, MergeUnique) { EXPECT_TRUE(1); auto schema = create_schema(); - std::vector inputs; + std::vector inputs; inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300)); - auto iter = vectorized::new_merge_iterator(inputs, -1, true, false, nullptr); + auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true, false, nullptr); StorageReadOptions opts; auto st = iter->init(opts); EXPECT_TRUE(st.ok()); @@ -221,8 +213,6 @@ TEST(VGenericIteratorsTest, MergeUnique) { EXPECT_EQ(base_value + 2, (*c2)[i].get()); row_count++; } - - delete iter; } // only used for Seq Column UT @@ -300,7 +290,7 @@ public: TEST(VGenericIteratorsTest, MergeWithSeqColumn) { EXPECT_TRUE(1); auto schema = create_schema(); - std::vector inputs; + std::vector inputs; int seq_column_id = 2; int seg_iter_num = 10; @@ -311,11 +301,12 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) { // input seg file in Ascending, expect output seq column in Descending for (int i = 0; i < seg_iter_num; i++) { int seq_id_in_every_file = i; - inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin, seq_column_id, - seq_id_in_every_file)); + inputs.push_back(std::make_unique( + schema, num_rows, rows_begin, seq_column_id, seq_id_in_every_file)); } - auto iter = vectorized::new_merge_iterator(inputs, seq_column_id, true, false, nullptr); + auto iter = + vectorized::new_merge_iterator(std::move(inputs), seq_column_id, true, false, nullptr); StorageReadOptions opts; auto st = iter->init(opts); EXPECT_TRUE(st.ok()); @@ -335,8 +326,6 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) { auto seq_col = block.get_by_position(seq_column_id).column; size_t actual_value = (*seq_col)[0].get(); EXPECT_EQ(seg_iter_num - 1, actual_value); - - delete iter; } } // namespace vectorized