[fix](reader) fix leak in Level1Iteartor (#23612)

_merge_next() and _normal_next() leak _cur_child when _cur_child->next()
returns failure.
This commit is contained in:
walter
2023-08-29 23:32:24 +08:00
committed by GitHub
parent 030df6db35
commit d1dbe7bfc8
2 changed files with 47 additions and 65 deletions

View File

@ -58,11 +58,7 @@ namespace vectorized {
} \
} while (false)
VCollectIterator::~VCollectIterator() {
for (auto child : _children) {
delete child;
}
}
VCollectIterator::~VCollectIterator() = default;
void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge,
bool is_reverse) {
@ -102,8 +98,7 @@ Status VCollectIterator::add_child(const RowSetSplits& rs_splits) {
return Status::OK();
}
std::unique_ptr<LevelIterator> child(new Level0Iterator(rs_splits.rs_reader, _reader));
_children.push_back(child.release());
_children.push_back(std::make_unique<Level0Iterator>(rs_splits.rs_reader, _reader));
return Status::OK();
}
@ -127,7 +122,6 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
c_iter != _children.end();) {
auto s = (*c_iter)->init(have_multiple_child);
if (!s.ok()) {
delete (*c_iter);
c_iter = _children.erase(c_iter);
r_iter = rs_readers.erase(r_iter);
if (!s.is<END_OF_FILE>()) {
@ -156,39 +150,38 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
auto base_reader_child = _children.begin();
std::advance(base_reader_child, base_reader_idx);
std::list<LevelIterator*> cumu_children;
std::list<std::unique_ptr<LevelIterator>> cumu_children;
for (auto iter = _children.begin(); iter != _children.end();) {
if (iter != base_reader_child) {
cumu_children.push_back(*iter);
cumu_children.push_back(std::move(*iter));
iter = _children.erase(iter);
} else {
++iter;
}
}
auto cumu_iter = std::make_unique<Level1Iterator>(
cumu_children, _reader, cumu_children.size() > 1, _is_reverse, _skip_same);
bool is_merge = cumu_children.size() > 1;
std::unique_ptr<LevelIterator> cumu_iter = std::make_unique<Level1Iterator>(
std::move(cumu_children), _reader, is_merge, _is_reverse, _skip_same);
RETURN_IF_NOT_EOF_AND_OK(cumu_iter->init());
std::list<LevelIterator*> children;
children.push_back(*base_reader_child);
children.push_back(cumu_iter.get());
auto level1_iter =
new Level1Iterator(children, _reader, _merge, _is_reverse, _skip_same);
cumu_iter.release();
_inner_iter.reset(level1_iter);
std::list<std::unique_ptr<LevelIterator>> children;
children.push_back(std::move(*base_reader_child));
children.push_back(std::move(cumu_iter));
_inner_iter.reset(new Level1Iterator(std::move(children), _reader, _merge, _is_reverse,
_skip_same));
// need to clear _children here, or else if the following _inner_iter->init() return early
// base_reader_child will be double deleted
_children.clear();
} else {
// _children.size() == 1
_inner_iter.reset(
new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same));
_inner_iter.reset(new Level1Iterator(std::move(_children), _reader, _merge, _is_reverse,
_skip_same));
}
} else {
auto level1_iter = std::make_unique<Level1Iterator>(_children, _reader, _merge, _is_reverse,
_skip_same);
auto level1_iter = std::make_unique<Level1Iterator>(std::move(_children), _reader, _merge,
_is_reverse, _skip_same);
_children.clear();
RETURN_IF_ERROR(level1_iter->init_level0_iterators_for_union());
_inner_iter.reset(level1_iter.release());
_inner_iter = std::move(level1_iter);
}
RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
// Clear _children earlier to release any related references
@ -592,10 +585,10 @@ Status VCollectIterator::Level0Iterator::current_block_row_locations(
}
VCollectIterator::Level1Iterator::Level1Iterator(
const std::list<VCollectIterator::LevelIterator*>& children, TabletReader* reader,
std::list<std::unique_ptr<VCollectIterator::LevelIterator>> children, TabletReader* reader,
bool merge, bool is_reverse, bool skip_same)
: LevelIterator(reader),
_children(children),
_children(std::move(children)),
_reader(reader),
_merge(merge),
_is_reverse(is_reverse),
@ -608,20 +601,10 @@ VCollectIterator::Level1Iterator::Level1Iterator(
}
VCollectIterator::Level1Iterator::~Level1Iterator() {
for (auto child : _children) {
if (child != nullptr) {
delete child;
child = nullptr;
}
}
if (_heap) {
while (!_heap->empty()) {
auto child = _heap->top();
delete _heap->top();
_heap->pop();
if (child) {
delete child;
}
}
}
}
@ -681,18 +664,20 @@ Status VCollectIterator::Level1Iterator::init(bool get_data_by_ref) {
}
}
_heap.reset(new MergeHeap {LevelIteratorComparator(sequence_loc, _is_reverse)});
for (auto child : _children) {
for (auto&& child : _children) {
DCHECK(child != nullptr);
//DCHECK(child->current_row().ok());
_heap->push(child);
_heap->push(child.release());
}
_cur_child = _heap->top();
_cur_child.reset(_heap->top());
_heap->pop();
// Clear _children earlier to release any related references
_children.clear();
} else {
_merge = false;
_heap.reset(nullptr);
_cur_child = *_children.begin();
_cur_child = std::move(*_children.begin());
_children.pop_front();
}
_ref = *_cur_child->current_row_ref();
return Status::OK();
@ -704,7 +689,6 @@ Status VCollectIterator::Level1Iterator::init_level0_iterators_for_union() {
for (auto iter = _children.begin(); iter != _children.end();) {
auto s = (*iter)->init_for_union(is_first_child, have_multiple_child);
if (!s.ok()) {
delete (*iter);
iter = _children.erase(iter);
if (!s.is<END_OF_FILE>()) {
return s;
@ -720,24 +704,24 @@ Status VCollectIterator::Level1Iterator::init_level0_iterators_for_union() {
}
Status VCollectIterator::Level1Iterator::_merge_next(IteratorRowRef* ref) {
_heap->pop();
auto res = _cur_child->next(ref);
if (LIKELY(res.ok())) {
_heap->push(_cur_child);
_cur_child = _heap->top();
_heap->push(_cur_child.release());
_cur_child.reset(_heap->top());
_heap->pop();
} else if (res.is<END_OF_FILE>()) {
// current child has been read, to read next
delete _cur_child;
if (!_heap->empty()) {
_cur_child = _heap->top();
_cur_child.reset(_heap->top());
_heap->pop();
} else {
_ref.reset();
_cur_child = nullptr;
_cur_child.reset();
return Status::Error<END_OF_FILE>("");
}
} else {
_ref.reset();
_cur_child = nullptr;
_cur_child.reset();
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
}
@ -763,17 +747,16 @@ Status VCollectIterator::Level1Iterator::_normal_next(IteratorRowRef* ref) {
return Status::OK();
} else if (res.is<END_OF_FILE>()) {
// current child has been read, to read next
delete _cur_child;
_children.pop_front();
if (!_children.empty()) {
_cur_child = *(_children.begin());
_cur_child = std::move(*(_children.begin()));
_children.pop_front();
return _normal_next(ref);
} else {
_cur_child = nullptr;
_cur_child.reset();
return Status::Error<END_OF_FILE>("");
}
} else {
_cur_child = nullptr;
_cur_child.reset();
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
}
@ -861,17 +844,16 @@ Status VCollectIterator::Level1Iterator::_normal_next(Block* block) {
return Status::OK();
} else if (res.is<END_OF_FILE>()) {
// current child has been read, to read next
delete _cur_child;
_children.pop_front();
if (!_children.empty()) {
_cur_child = *(_children.begin());
_cur_child = std::move(*(_children.begin()));
_children.pop_front();
return _normal_next(block);
} else {
_cur_child = nullptr;
_cur_child.reset();
return Status::Error<END_OF_FILE>("");
}
} else {
_cur_child = nullptr;
_cur_child.reset();
LOG(WARNING) << "failed to get next from child, res=" << res;
return res;
}

View File

@ -258,8 +258,8 @@ private:
// Iterate from LevelIterators (maybe Level0Iterators or Level1Iterator or mixed)
class Level1Iterator : public LevelIterator {
public:
Level1Iterator(const std::list<LevelIterator*>& children, TabletReader* reader, bool merge,
bool is_reverse, bool skip_same);
Level1Iterator(std::list<std::unique_ptr<LevelIterator>> children, TabletReader* reader,
bool merge, bool is_reverse, bool skip_same);
Status init(bool get_data_by_ref = false) override;
@ -295,10 +295,10 @@ private:
// Each LevelIterator corresponds to a rowset reader,
// it will be cleared after '_heap' has been initialized when '_merge == true'.
std::list<LevelIterator*> _children;
std::list<std::unique_ptr<LevelIterator>> _children;
// point to the Level0Iterator containing the next output row.
// null when VCollectIterator hasn't been initialized or reaches EOF.
LevelIterator* _cur_child = nullptr;
std::unique_ptr<LevelIterator> _cur_child;
TabletReader* _reader = nullptr;
// when `_merge == true`, rowset reader returns ordered rows and VCollectIterator uses a priority queue to merge
@ -321,7 +321,7 @@ private:
// Each LevelIterator corresponds to a rowset reader,
// it will be cleared after '_inner_iter' has been initialized.
std::list<LevelIterator*> _children;
std::list<std::unique_ptr<LevelIterator>> _children;
bool _merge = true;
// reverse the compare order
@ -338,4 +338,4 @@ private:
};
} // namespace vectorized
} // namespace doris
} // namespace doris