diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index fe521c2705..5fbd1ed2c2 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -464,23 +464,22 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { _merge_heap.push(ctx); } else { // push next iterator in same rowset into heap - auto cur_order = ctx->order(); - while (cur_order + 1 < _iterator_init_flags.size() && - !_iterator_init_flags[cur_order + 1]) { - auto next_ctx = _ori_iter_ctx[cur_order + 1]; + size_t cur_order = ctx->order(); + for (size_t next_order = cur_order + 1; + next_order < _iterator_init_flags.size() && !_iterator_init_flags[next_order]; + ++next_order) { + auto& next_ctx = _ori_iter_ctx[next_order]; DCHECK(next_ctx); RETURN_IF_ERROR(next_ctx->init(_opts)); - if (!next_ctx->valid()) { - // next_ctx is empty segment, move to next - ++cur_order; - delete next_ctx; - continue; + if (next_ctx->valid()) { + _merge_heap.push(next_ctx.get()); + break; } - _merge_heap.push(next_ctx); - break; + // next_ctx is empty segment, move to next + next_ctx.reset(); } // Release ctx earlier to reduce resource consumed - delete ctx; + _ori_iter_ctx[cur_order].reset(); } } RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources)); @@ -501,7 +500,16 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { } _schema = &(*_origin_iters.begin())->schema(); - auto seg_order = 0; + size_t num_iters = _origin_iters.size(); + for (size_t seg_order = 0; seg_order < num_iters; ++seg_order) { + auto& iter = _origin_iters[seg_order]; + auto ctx = std::make_unique( + std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx, + _key_group_cluster_key_idxes); + _ori_iter_ctx.push_back(std::move(ctx)); + } + _origin_iters.clear(); + // Init contxt depends on _iterator_init_flags // for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator needs // to be inited and [0-2] is in same rowset. @@ -509,25 +517,18 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { // will not be pushed into heap, we should init next one util we find a valid iter // so this rowset can work in heap bool pre_iter_invalid = false; - for (auto& iter : _origin_iters) { - VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext( - std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx, - _key_group_cluster_key_idxes); - _ori_iter_ctx.push_back(ctx); - if (_iterator_init_flags[seg_order] || pre_iter_invalid) { + for (size_t i = 0; i < num_iters; ++i) { + if (_iterator_init_flags[i] || pre_iter_invalid) { + auto& ctx = _ori_iter_ctx[i]; RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { pre_iter_invalid = true; - ++seg_order; - delete ctx; continue; } - _merge_heap.push(ctx); + _merge_heap.push(ctx.get()); pre_iter_invalid = false; } - ++seg_order; } - _origin_iters.clear(); _opts = opts; _block_row_max = opts.block_row_max; @@ -629,7 +630,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { // ---------------- VerticalMaskMergeIterator ------------- // Status VerticalMaskMergeIterator::check_all_iter_finished() { - for (auto iter : _origin_iter_ctx) { + for (auto& iter : _origin_iter_ctx) { if (iter->inited()) { if (iter->valid()) { RETURN_IF_ERROR(iter->advance()); @@ -760,7 +761,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { for (auto& iter : _origin_iters) { auto ctx = std::make_unique(std::move(iter), rs_id, _ori_return_cols, -1, -1); - _origin_iter_ctx.emplace_back(ctx.release()); + _origin_iter_ctx.push_back(std::move(ctx)); } _origin_iters.clear(); diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index 760835e6d3..f46a0446cf 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -121,7 +121,7 @@ public: size_t same_source_count(uint16_t source, size_t limit); - // return continous agg_flag=true count from index + // return continuous agg_flag=true count from index size_t continuous_agg_count(uint64_t index); private: @@ -155,7 +155,7 @@ public: _order(order), _seq_col_idx(seq_col_idx), _num_key_columns(_iter->schema().num_key_columns()), - _key_group_cluster_key_idxes(key_group_cluster_key_idxes) {} + _key_group_cluster_key_idxes(std::move(key_group_cluster_key_idxes)) {} VerticalMergeIteratorContext(const VerticalMergeIteratorContext&) = delete; VerticalMergeIteratorContext(VerticalMergeIteratorContext&&) = delete; @@ -209,7 +209,7 @@ private: size_t _ori_return_cols = 0; // segment order, used to compare key - uint32_t _order = 0; + const uint32_t _order = 0; int32_t _seq_col_idx = -1; @@ -243,21 +243,17 @@ public: RowSourcesBuffer* row_sources_buf, std::vector key_group_cluster_key_idxes) : _origin_iters(std::move(iters)), - _iterator_init_flags(iterator_init_flags), - _rowset_ids(rowset_ids), + _iterator_init_flags(std::move(iterator_init_flags)), + _rowset_ids(std::move(rowset_ids)), _ori_return_cols(ori_return_cols), _keys_type(keys_type), _seq_col_idx(seq_col_idx), _row_sources_buf(row_sources_buf), - _key_group_cluster_key_idxes(key_group_cluster_key_idxes) {} + _key_group_cluster_key_idxes(std::move(key_group_cluster_key_idxes)) {} - ~VerticalHeapMergeIterator() override { - while (!_merge_heap.empty()) { - auto ctx = _merge_heap.top(); - _merge_heap.pop(); - delete ctx; - } - } + ~VerticalHeapMergeIterator() override = default; + VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete; + VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete; Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; @@ -292,7 +288,7 @@ private: VerticalMergeContextComparator>; VMergeHeap _merge_heap; - std::vector _ori_iter_ctx; + std::vector> _ori_iter_ctx; int _block_row_max = 0; KeysType _keys_type; int32_t _seq_col_idx = -1; @@ -314,14 +310,16 @@ public: KeysType keys_type, int32_t seq_col_idx, RowSourcesBuffer* row_sources_buf) : _origin_iters(std::move(iters)), - _iterator_init_flags(iterator_init_flags), - _rowset_ids(rowset_ids), + _iterator_init_flags(std::move(iterator_init_flags)), + _rowset_ids(std::move(rowset_ids)), _ori_return_cols(ori_return_cols), _keys_type(keys_type), _seq_col_idx(seq_col_idx), _row_sources_buf(row_sources_buf) {} ~VerticalFifoMergeIterator() override = default; + VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete; + VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete; Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; @@ -365,11 +363,9 @@ public: _ori_return_cols(ori_return_cols), _row_sources_buf(row_sources_buf) {} - ~VerticalMaskMergeIterator() override { - for (auto iter : _origin_iter_ctx) { - delete iter; - } - } + ~VerticalMaskMergeIterator() override = default; + VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete; + VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete; Status init(const StorageReadOptions& opts) override; @@ -392,7 +388,7 @@ private: std::vector _origin_iters; size_t _ori_return_cols = 0; - std::vector _origin_iter_ctx; + std::vector> _origin_iter_ctx; const Schema* _schema = nullptr;