[fix](iterator) Fix mem leak when initial iterator failed (#28480)

This commit is contained in:
walter
2023-12-16 10:49:05 +08:00
committed by GitHub
parent f770403cca
commit f741ce5b7b
2 changed files with 45 additions and 48 deletions

View File

@ -464,23 +464,22 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
_merge_heap.push(ctx);
} else {
// push next iterator in same rowset into heap
auto cur_order = ctx->order();
while (cur_order + 1 < _iterator_init_flags.size() &&
!_iterator_init_flags[cur_order + 1]) {
auto next_ctx = _ori_iter_ctx[cur_order + 1];
size_t cur_order = ctx->order();
for (size_t next_order = cur_order + 1;
next_order < _iterator_init_flags.size() && !_iterator_init_flags[next_order];
++next_order) {
auto& next_ctx = _ori_iter_ctx[next_order];
DCHECK(next_ctx);
RETURN_IF_ERROR(next_ctx->init(_opts));
if (!next_ctx->valid()) {
// next_ctx is empty segment, move to next
++cur_order;
delete next_ctx;
continue;
if (next_ctx->valid()) {
_merge_heap.push(next_ctx.get());
break;
}
_merge_heap.push(next_ctx);
break;
// next_ctx is empty segment, move to next
next_ctx.reset();
}
// Release ctx earlier to reduce resource consumed
delete ctx;
_ori_iter_ctx[cur_order].reset();
}
}
RETURN_IF_ERROR(_row_sources_buf->append(tmp_row_sources));
@ -501,7 +500,16 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
}
_schema = &(*_origin_iters.begin())->schema();
auto seg_order = 0;
size_t num_iters = _origin_iters.size();
for (size_t seg_order = 0; seg_order < num_iters; ++seg_order) {
auto& iter = _origin_iters[seg_order];
auto ctx = std::make_unique<VerticalMergeIteratorContext>(
std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx,
_key_group_cluster_key_idxes);
_ori_iter_ctx.push_back(std::move(ctx));
}
_origin_iters.clear();
// Init contxt depends on _iterator_init_flags
// for example, the vector is [1,0,0,1,1], mean that order 0,3,4 iterator needs
// to be inited and [0-2] is in same rowset.
@ -509,25 +517,18 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
// will not be pushed into heap, we should init next one util we find a valid iter
// so this rowset can work in heap
bool pre_iter_invalid = false;
for (auto& iter : _origin_iters) {
VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext(
std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx,
_key_group_cluster_key_idxes);
_ori_iter_ctx.push_back(ctx);
if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
for (size_t i = 0; i < num_iters; ++i) {
if (_iterator_init_flags[i] || pre_iter_invalid) {
auto& ctx = _ori_iter_ctx[i];
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
pre_iter_invalid = true;
++seg_order;
delete ctx;
continue;
}
_merge_heap.push(ctx);
_merge_heap.push(ctx.get());
pre_iter_invalid = false;
}
++seg_order;
}
_origin_iters.clear();
_opts = opts;
_block_row_max = opts.block_row_max;
@ -629,7 +630,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) {
// ---------------- VerticalMaskMergeIterator ------------- //
Status VerticalMaskMergeIterator::check_all_iter_finished() {
for (auto iter : _origin_iter_ctx) {
for (auto& iter : _origin_iter_ctx) {
if (iter->inited()) {
if (iter->valid()) {
RETURN_IF_ERROR(iter->advance());
@ -760,7 +761,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
for (auto& iter : _origin_iters) {
auto ctx = std::make_unique<VerticalMergeIteratorContext>(std::move(iter), rs_id,
_ori_return_cols, -1, -1);
_origin_iter_ctx.emplace_back(ctx.release());
_origin_iter_ctx.push_back(std::move(ctx));
}
_origin_iters.clear();

View File

@ -121,7 +121,7 @@ public:
size_t same_source_count(uint16_t source, size_t limit);
// return continous agg_flag=true count from index
// return continuous agg_flag=true count from index
size_t continuous_agg_count(uint64_t index);
private:
@ -155,7 +155,7 @@ public:
_order(order),
_seq_col_idx(seq_col_idx),
_num_key_columns(_iter->schema().num_key_columns()),
_key_group_cluster_key_idxes(key_group_cluster_key_idxes) {}
_key_group_cluster_key_idxes(std::move(key_group_cluster_key_idxes)) {}
VerticalMergeIteratorContext(const VerticalMergeIteratorContext&) = delete;
VerticalMergeIteratorContext(VerticalMergeIteratorContext&&) = delete;
@ -209,7 +209,7 @@ private:
size_t _ori_return_cols = 0;
// segment order, used to compare key
uint32_t _order = 0;
const uint32_t _order = 0;
int32_t _seq_col_idx = -1;
@ -243,21 +243,17 @@ public:
RowSourcesBuffer* row_sources_buf,
std::vector<uint32_t> key_group_cluster_key_idxes)
: _origin_iters(std::move(iters)),
_iterator_init_flags(iterator_init_flags),
_rowset_ids(rowset_ids),
_iterator_init_flags(std::move(iterator_init_flags)),
_rowset_ids(std::move(rowset_ids)),
_ori_return_cols(ori_return_cols),
_keys_type(keys_type),
_seq_col_idx(seq_col_idx),
_row_sources_buf(row_sources_buf),
_key_group_cluster_key_idxes(key_group_cluster_key_idxes) {}
_key_group_cluster_key_idxes(std::move(key_group_cluster_key_idxes)) {}
~VerticalHeapMergeIterator() override {
while (!_merge_heap.empty()) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
delete ctx;
}
}
~VerticalHeapMergeIterator() override = default;
VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete;
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
@ -292,7 +288,7 @@ private:
VerticalMergeContextComparator>;
VMergeHeap _merge_heap;
std::vector<VerticalMergeIteratorContext*> _ori_iter_ctx;
std::vector<std::unique_ptr<VerticalMergeIteratorContext>> _ori_iter_ctx;
int _block_row_max = 0;
KeysType _keys_type;
int32_t _seq_col_idx = -1;
@ -314,14 +310,16 @@ public:
KeysType keys_type, int32_t seq_col_idx,
RowSourcesBuffer* row_sources_buf)
: _origin_iters(std::move(iters)),
_iterator_init_flags(iterator_init_flags),
_rowset_ids(rowset_ids),
_iterator_init_flags(std::move(iterator_init_flags)),
_rowset_ids(std::move(rowset_ids)),
_ori_return_cols(ori_return_cols),
_keys_type(keys_type),
_seq_col_idx(seq_col_idx),
_row_sources_buf(row_sources_buf) {}
~VerticalFifoMergeIterator() override = default;
VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete;
Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
@ -365,11 +363,9 @@ public:
_ori_return_cols(ori_return_cols),
_row_sources_buf(row_sources_buf) {}
~VerticalMaskMergeIterator() override {
for (auto iter : _origin_iter_ctx) {
delete iter;
}
}
~VerticalMaskMergeIterator() override = default;
VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete;
Status init(const StorageReadOptions& opts) override;
@ -392,7 +388,7 @@ private:
std::vector<RowwiseIteratorUPtr> _origin_iters;
size_t _ori_return_cols = 0;
std::vector<VerticalMergeIteratorContext*> _origin_iter_ctx;
std::vector<std::unique_ptr<VerticalMergeIteratorContext>> _origin_iter_ctx;
const Schema* _schema = nullptr;