[improvement](rowset reader) fix possible memleak (#16680)

* [improvement](rowset reader) fix possible memleak

* fix be UT
This commit is contained in:
TengJianPing
2023-02-15 11:13:31 +08:00
committed by GitHub
parent db9319b881
commit 9b8c91e18c
18 changed files with 120 additions and 164 deletions

View File

@ -126,6 +126,8 @@ public:
int32_t tablet_id = 0;
};
class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
public:
RowwiseIterator() = default;

View File

@ -133,10 +133,8 @@ bool TabletReader::_optimize_for_single_rowset(
return !has_overlapping && nonoverlapping_count == 1 && !has_delete_rowset;
}
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
const std::vector<RowsetReaderSharedPtr>* rs_readers = &read_params.rs_readers;
if (rs_readers->empty()) {
Status TabletReader::_capture_rs_readers(const ReaderParams& read_params) {
if (read_params.rs_readers.empty()) {
return Status::InternalError("fail to acquire data sources. tablet={}",
_tablet->full_name());
}
@ -229,8 +227,6 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.remaining_vconjunct_root = read_params.remaining_vconjunct_root;
_reader_context.output_columns = &read_params.output_columns;
*valid_rs_readers = *rs_readers;
return Status::OK();
}

View File

@ -183,8 +183,7 @@ protected:
Status _init_params(const ReaderParams& read_params);
Status _capture_rs_readers(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
Status _capture_rs_readers(const ReaderParams& read_params);
bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers);

View File

@ -50,7 +50,7 @@ bool BetaRowsetReader::update_profile(RuntimeProfile* profile) {
}
Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIterator*>* out_iters,
std::vector<RowwiseIteratorUPtr>* out_iters,
bool use_cache) {
RETURN_NOT_OK(_rowset->load());
_context = read_context;
@ -172,7 +172,6 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
should_use_cache));
// create iterator for each segment
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
for (auto& seg_ptr : _segment_cache_handle.get_segments()) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(*_input_schema, _read_options, &iter);
@ -183,29 +182,22 @@ Status BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
if (iter->empty()) {
continue;
}
seg_iterators.push_back(std::move(iter));
}
std::vector<RowwiseIterator*> iterators;
for (auto& owned_it : seg_iterators) {
auto st = owned_it->init(_read_options);
auto st = iter->init(_read_options);
if (!st.ok()) {
LOG(WARNING) << "failed to init iterator: " << st.to_string();
return Status::Error<ROWSET_READER_INIT>();
}
// transfer ownership of segment iterator to `_iterator`
out_iters->push_back(owned_it.release());
out_iters->push_back(std::move(iter));
}
return Status::OK();
}
Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
_context = read_context;
std::vector<RowwiseIterator*> iterators;
std::vector<RowwiseIteratorUPtr> iterators;
RETURN_NOT_OK(get_segment_iterators(_context, &iterators));
// merge or union segment iterator
RowwiseIterator* final_iterator;
if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
auto sequence_loc = -1;
if (read_context->sequence_id_idx != -1) {
@ -216,23 +208,23 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
}
}
}
final_iterator = vectorized::new_merge_iterator(
iterators, sequence_loc, read_context->is_unique,
_iterator = vectorized::new_merge_iterator(
std::move(iterators), sequence_loc, read_context->is_unique,
read_context->read_orderby_key_reverse, read_context->merged_rows);
} else {
if (read_context->read_orderby_key_reverse) {
// reverse iterators to read backward for ORDER BY key DESC
std::reverse(iterators.begin(), iterators.end());
}
final_iterator = vectorized::new_union_iterator(iterators);
_iterator = vectorized::new_union_iterator(std::move(iterators));
}
auto s = final_iterator->init(_read_options);
auto s = _iterator->init(_read_options);
if (!s.ok()) {
LOG(WARNING) << "failed to init iterator: " << s.to_string();
_iterator.reset();
return Status::Error<ROWSET_READER_INIT>();
}
_iterator.reset(final_iterator);
return Status::OK();
}

View File

@ -34,7 +34,7 @@ public:
Status init(RowsetReaderContext* read_context) override;
Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIterator*>* out_iters,
std::vector<RowwiseIteratorUPtr>* out_iters,
bool use_cache = false) override;
void reset_read_options() override;
Status next_block(vectorized::Block* block) override;

View File

@ -116,7 +116,7 @@ Status BetaRowsetWriter::add_block(const vectorized::Block* block) {
return _add_block(block, &_segment_writer);
}
vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
RowwiseIteratorUPtr BetaRowsetWriter::_get_segcompaction_reader(
SegCompactionCandidatesSharedPtr segments, std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat, uint64_t* merged_row_stat) {
StorageReadOptions read_options;
@ -133,26 +133,18 @@ vectorized::VMergeIterator* BetaRowsetWriter::_get_segcompaction_reader(
}
seg_iterators.push_back(std::move(iter));
}
std::vector<RowwiseIterator*> iterators;
for (auto& owned_it : seg_iterators) {
// transfer ownership
iterators.push_back(owned_it.release());
}
bool is_unique = (_context.tablet_schema->keys_type() == UNIQUE_KEYS);
bool is_reverse = false;
auto merge_itr =
vectorized::new_merge_iterator(iterators, -1, is_unique, is_reverse, merged_row_stat);
auto merge_itr = vectorized::new_merge_iterator(std::move(seg_iterators), -1, is_unique,
is_reverse, merged_row_stat);
DCHECK(merge_itr);
auto s = merge_itr->init(read_options);
if (!s.ok()) {
LOG(WARNING) << "failed to init iterator: " << s.to_string();
for (auto& itr : iterators) {
delete itr;
}
return nullptr;
}
return (vectorized::VMergeIterator*)merge_itr;
return merge_itr;
}
std::unique_ptr<segment_v2::SegmentWriter> BetaRowsetWriter::_create_segcompaction_writer(
@ -283,14 +275,11 @@ Status BetaRowsetWriter::_do_compact_segments(SegCompactionCandidatesSharedPtr s
_context.tablet_schema->columns().size());
std::unique_ptr<OlapReaderStatistics> stat(new OlapReaderStatistics());
uint64_t merged_row_stat = 0;
vectorized::VMergeIterator* reader =
_get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
if (UNLIKELY(reader == nullptr)) {
auto reader_ptr = _get_segcompaction_reader(segments, schema, stat.get(), &merged_row_stat);
if (UNLIKELY(reader_ptr == nullptr)) {
LOG(WARNING) << "failed to get segcompaction reader";
return Status::Error<SEGCOMPACTION_INIT_READER>();
}
std::unique_ptr<vectorized::VMergeIterator> reader_ptr;
reader_ptr.reset(reader);
auto writer = _create_segcompaction_writer(begin, end);
if (UNLIKELY(writer == nullptr)) {
LOG(WARNING) << "failed to get segcompaction writer";

View File

@ -107,10 +107,10 @@ private:
void _build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta);
Status _segcompaction_if_necessary();
Status _segcompaction_ramaining_if_necessary();
vectorized::VMergeIterator* _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments,
std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat,
uint64_t* merged_row_stat);
RowwiseIteratorUPtr _get_segcompaction_reader(SegCompactionCandidatesSharedPtr segments,
std::shared_ptr<Schema> schema,
OlapReaderStatistics* stat,
uint64_t* merged_row_stat);
std::unique_ptr<segment_v2::SegmentWriter> _create_segcompaction_writer(uint64_t begin,
uint64_t end);
Status _delete_original_segments(uint32_t begin, uint32_t end);

View File

@ -44,7 +44,7 @@ public:
virtual Status init(RowsetReaderContext* read_context) = 0;
virtual Status get_segment_iterators(RowsetReaderContext* read_context,
std::vector<RowwiseIterator*>* out_iters,
std::vector<RowwiseIteratorUPtr>* out_iters,
bool use_cache = false) = 0;
virtual void reset_read_options() = 0;

View File

@ -135,8 +135,7 @@ Status Segment::new_iterator(const Schema& schema, const StorageReadOptions& rea
} else {
iter->reset(new SegmentIterator(this->shared_from_this(), schema));
}
iter->get()->init(read_options);
return Status::OK();
return iter->get()->init(read_options);
}
Status Segment::_parse_footer() {

View File

@ -58,10 +58,8 @@ bool BlockReader::_rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>&
}
return false;
}
Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
std::vector<RowsetReaderSharedPtr> rs_readers;
auto res = _capture_rs_readers(read_params, &rs_readers);
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
auto res = _capture_rs_readers(read_params);
if (!res.ok()) {
LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
@ -71,13 +69,14 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
return res;
}
// check if rowsets are noneoverlapping
_is_rowsets_overlapping = _rowsets_overlapping(rs_readers);
_is_rowsets_overlapping = _rowsets_overlapping(read_params.rs_readers);
_vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
read_params.read_orderby_key_reverse);
_reader_context.is_vec = true;
_reader_context.push_down_agg_type_opt = read_params.push_down_agg_type_opt;
for (auto& rs_reader : rs_readers) {
std::vector<RowsetReaderSharedPtr> valid_rs_readers;
for (auto& rs_reader : read_params.rs_readers) {
// _vcollect_iter.topn_next() will init rs_reader by itself
if (!_vcollect_iter.use_topn_next()) {
RETURN_NOT_OK(rs_reader->init(&_reader_context));
@ -88,11 +87,11 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
return res;
}
if (res.ok()) {
valid_rs_readers->push_back(rs_reader);
valid_rs_readers.push_back(rs_reader);
}
}
RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers));
// _vcollect_iter.topn_next() can not use current_row
if (!_vcollect_iter.use_topn_next()) {
auto status = _vcollect_iter.current_row(&_next_row);
@ -157,8 +156,7 @@ Status BlockReader::init(const ReaderParams& read_params) {
}
}
std::vector<RowsetReaderSharedPtr> rs_readers;
auto status = _init_collect_iter(read_params, &rs_readers);
auto status = _init_collect_iter(read_params);
if (!status.ok()) {
return status;
}

View File

@ -62,8 +62,7 @@ private:
// to minimize the comparison time in merge heap.
Status _unique_key_next_block(Block* block, bool* eof);
Status _init_collect_iter(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers);
Status _init_collect_iter(const ReaderParams& read_params);
void _init_agg_state(const ReaderParams& read_params);

View File

@ -36,11 +36,10 @@ VerticalBlockReader::~VerticalBlockReader() {
}
Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIterator*>* segment_iters,
std::vector<RowwiseIteratorUPtr>* segment_iters,
std::vector<bool>* iterator_init_flag,
std::vector<RowsetId>* rowset_ids) {
std::vector<RowsetReaderSharedPtr> rs_readers;
auto res = _capture_rs_readers(read_params, &rs_readers);
auto res = _capture_rs_readers(read_params);
if (!res.ok()) {
LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
<< ", tablet_id:" << read_params.tablet->tablet_id()
@ -51,7 +50,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para
}
_reader_context.is_vec = true;
_reader_context.is_vertical_compaction = true;
for (auto& rs_reader : rs_readers) {
for (auto& rs_reader : read_params.rs_readers) {
// segment iterator will be inited here
// In vertical compaction, every group will load segment so we should cache
// segment to avoid tot many s3 head request
@ -84,7 +83,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para
Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) {
// get segment iterators
std::vector<RowwiseIterator*> segment_iters;
std::vector<RowwiseIteratorUPtr> segment_iters;
std::vector<bool> iterator_init_flag;
std::vector<RowsetId> rowset_ids;
RETURN_IF_ERROR(
@ -99,11 +98,11 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params)
seq_col_idx = read_params.tablet->tablet_schema()->sequence_col_idx();
}
_vcollect_iter = new_vertical_heap_merge_iterator(
segment_iters, iterator_init_flag, rowset_ids, ori_return_col_size,
std::move(segment_iters), iterator_init_flag, rowset_ids, ori_return_col_size,
read_params.tablet->keys_type(), seq_col_idx, _row_sources_buffer);
} else {
_vcollect_iter = new_vertical_mask_merge_iterator(segment_iters, ori_return_col_size,
_row_sources_buffer);
_vcollect_iter = new_vertical_mask_merge_iterator(std::move(segment_iters),
ori_return_col_size, _row_sources_buffer);
}
// init collect iterator
StorageReadOptions opts;

View File

@ -65,7 +65,7 @@ private:
Status _init_collect_iter(const ReaderParams& read_params);
Status _get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIterator*>* segment_iters,
std::vector<RowwiseIteratorUPtr>* segment_iters,
std::vector<bool>* iterator_init_flag,
std::vector<RowsetId>* rowset_ids);

View File

@ -463,9 +463,9 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
// will not be pushed into heap, we should init next one util we find a valid iter
// so this rowset can work in heap
bool pre_iter_invalid = false;
for (auto iter : _origin_iters) {
for (auto& iter : _origin_iters) {
VerticalMergeIteratorContext* ctx = new VerticalMergeIteratorContext(
iter, _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx);
std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx);
_ori_iter_ctx.push_back(ctx);
if (_iterator_init_flags[seg_order] || pre_iter_invalid) {
RETURN_IF_ERROR(ctx->init(opts));
@ -600,9 +600,9 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
_opts = opts;
RowsetId rs_id;
for (auto iter : _origin_iters) {
auto ctx = std::make_unique<VerticalMergeIteratorContext>(iter, rs_id, _ori_return_cols, -1,
-1);
for (auto& iter : _origin_iters) {
auto ctx = std::make_unique<VerticalMergeIteratorContext>(std::move(iter), rs_id,
_ori_return_cols, -1, -1);
_origin_iter_ctx.emplace_back(ctx.release());
}
_origin_iters.clear();
@ -613,7 +613,7 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
// interfaces to create vertical merge iterator
std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
std::vector<RowwiseIterator*> inputs, const std::vector<bool>& iterator_init_flag,
std::vector<RowwiseIteratorUPtr>&& inputs, const std::vector<bool>& iterator_init_flag,
const std::vector<RowsetId>& rowset_ids, size_t ori_return_cols, KeysType keys_type,
uint32_t seq_col_idx, RowSourcesBuffer* row_sources) {
return std::make_shared<VerticalHeapMergeIterator>(std::move(inputs), iterator_init_flag,
@ -622,9 +622,10 @@ std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
}
std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols,
std::vector<RowwiseIteratorUPtr>&& inputs, size_t ori_return_cols,
RowSourcesBuffer* row_sources) {
return std::make_shared<VerticalMaskMergeIterator>(inputs, ori_return_cols, row_sources);
return std::make_shared<VerticalMaskMergeIterator>(std::move(inputs), ori_return_cols,
row_sources);
}
} // namespace vectorized

View File

@ -129,24 +129,21 @@ private:
// takes ownership of rowwise iterator
class VerticalMergeIteratorContext {
public:
VerticalMergeIteratorContext(RowwiseIterator* iter, RowsetId rowset_id, size_t ori_return_cols,
uint32_t order, uint32_t seq_col_idx)
: _iter(iter),
VerticalMergeIteratorContext(RowwiseIteratorUPtr&& iter, RowsetId rowset_id,
size_t ori_return_cols, uint32_t order, uint32_t seq_col_idx)
: _iter(std::move(iter)),
_rowset_id(rowset_id),
_ori_return_cols(ori_return_cols),
_order(order),
_seq_col_idx(seq_col_idx),
_num_key_columns(iter->schema().num_key_columns()) {}
_num_key_columns(_iter->schema().num_key_columns()) {}
VerticalMergeIteratorContext(const VerticalMergeIteratorContext&) = delete;
VerticalMergeIteratorContext(VerticalMergeIteratorContext&&) = delete;
VerticalMergeIteratorContext& operator=(const VerticalMergeIteratorContext&) = delete;
VerticalMergeIteratorContext& operator=(VerticalMergeIteratorContext&&) = delete;
~VerticalMergeIteratorContext() {
delete _iter;
_iter = nullptr;
}
~VerticalMergeIteratorContext() {}
Status block_reset(const std::shared_ptr<Block>& block);
Status init(const StorageReadOptions& opts);
bool compare(const VerticalMergeIteratorContext& rhs) const;
@ -188,7 +185,7 @@ private:
// Load next block into _block
Status _load_next_block();
RowwiseIterator* _iter;
RowwiseIteratorUPtr _iter;
RowsetId _rowset_id;
size_t _ori_return_cols = 0;
@ -219,7 +216,7 @@ private:
class VerticalHeapMergeIterator : public RowwiseIterator {
public:
// VerticalMergeIterator takes the ownership of input iterators
VerticalHeapMergeIterator(std::vector<RowwiseIterator*> iters,
VerticalHeapMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters,
std::vector<bool> iterator_init_flags,
std::vector<RowsetId> rowset_ids, size_t ori_return_cols,
KeysType keys_type, int32_t seq_col_idx,
@ -255,7 +252,7 @@ private:
private:
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
std::vector<RowwiseIteratorUPtr> _origin_iters;
std::vector<bool> _iterator_init_flags;
std::vector<RowsetId> _rowset_ids;
size_t _ori_return_cols;
@ -289,7 +286,7 @@ private:
class VerticalMaskMergeIterator : public RowwiseIterator {
public:
// VerticalMaskMergeIterator takes the ownership of input iterators
VerticalMaskMergeIterator(std::vector<RowwiseIterator*> iters, size_t ori_return_cols,
VerticalMaskMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters, size_t ori_return_cols,
RowSourcesBuffer* row_sources_buf)
: _origin_iters(std::move(iters)),
_ori_return_cols(ori_return_cols),
@ -318,7 +315,7 @@ private:
private:
// released after build ctx
std::vector<RowwiseIterator*> _origin_iters;
std::vector<RowwiseIteratorUPtr> _origin_iters;
size_t _ori_return_cols = 0;
std::vector<VerticalMergeIteratorContext*> _origin_iter_ctx;
@ -332,12 +329,12 @@ private:
// segment merge iterator
std::shared_ptr<RowwiseIterator> new_vertical_heap_merge_iterator(
std::vector<RowwiseIterator*> inputs, const std::vector<bool>& iterator_init_flag,
std::vector<RowwiseIteratorUPtr>&& inputs, const std::vector<bool>& iterator_init_flag,
const std::vector<RowsetId>& rowset_ids, size_t _ori_return_cols, KeysType key_type,
uint32_t seq_col_idx, RowSourcesBuffer* row_sources_buf);
std::shared_ptr<RowwiseIterator> new_vertical_mask_merge_iterator(
const std::vector<RowwiseIterator*>& inputs, size_t ori_return_cols,
std::vector<RowwiseIteratorUPtr>&& inputs, size_t ori_return_cols,
RowSourcesBuffer* row_sources_buf);
} // namespace vectorized

View File

@ -317,12 +317,13 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
_schema = &(*_origin_iters.begin())->schema();
_schema = &(_origin_iters[0]->schema());
_record_rowids = opts.record_rowids;
for (auto iter : _origin_iters) {
auto ctx = std::make_unique<VMergeIteratorContext>(
iter, _sequence_id_idx, _is_unique, _is_reverse, opts.read_orderby_key_columns);
for (auto& iter : _origin_iters) {
auto ctx = std::make_unique<VMergeIteratorContext>(std::move(iter), _sequence_id_idx,
_is_unique, _is_reverse,
opts.read_orderby_key_columns);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@ -343,12 +344,9 @@ public:
// Iterators' ownership it transferred to this class.
// This class will delete all iterators when destructs
// Client should not use iterators anymore.
VUnionIterator(std::vector<RowwiseIterator*>& v) : _origin_iters(v.begin(), v.end()) {}
VUnionIterator(std::vector<RowwiseIteratorUPtr>&& v) : _origin_iters(std::move(v)) {}
~VUnionIterator() override {
std::for_each(_origin_iters.begin(), _origin_iters.end(),
std::default_delete<RowwiseIterator>());
}
~VUnionIterator() override {}
Status init(const StorageReadOptions& opts) override;
@ -368,7 +366,7 @@ public:
private:
const Schema* _schema = nullptr;
RowwiseIterator* _cur_iter = nullptr;
std::deque<RowwiseIterator*> _origin_iters;
std::vector<RowwiseIteratorUPtr> _origin_iters;
};
Status VUnionIterator::init(const StorageReadOptions& opts) {
@ -376,10 +374,10 @@ Status VUnionIterator::init(const StorageReadOptions& opts) {
return Status::OK();
}
for (auto iter : _origin_iters) {
for (auto& iter : _origin_iters) {
RETURN_IF_ERROR(iter->init(opts));
}
_cur_iter = *(_origin_iters.begin());
_cur_iter = _origin_iters.back().get();
_schema = &_cur_iter->schema();
return Status::OK();
}
@ -388,10 +386,9 @@ Status VUnionIterator::next_batch(Block* block) {
while (_cur_iter != nullptr) {
auto st = _cur_iter->next_batch(block);
if (st.is<END_OF_FILE>()) {
delete _cur_iter;
_origin_iters.pop_front();
_origin_iters.pop_back();
if (!_origin_iters.empty()) {
_cur_iter = *(_origin_iters.begin());
_cur_iter = _origin_iters.back().get();
} else {
_cur_iter = nullptr;
}
@ -410,27 +407,29 @@ Status VUnionIterator::current_block_row_locations(std::vector<RowLocation>* loc
return _cur_iter->current_block_row_locations(locations);
}
RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx,
bool is_unique, bool is_reverse, uint64_t* merged_rows) {
RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&& inputs,
int sequence_id_idx, bool is_unique, bool is_reverse,
uint64_t* merged_rows) {
if (inputs.size() == 1) {
return *(inputs.begin());
return std::move(inputs[0]);
}
return new VMergeIterator(inputs, sequence_id_idx, is_unique, is_reverse, merged_rows);
return std::make_unique<VMergeIterator>(std::move(inputs), sequence_id_idx, is_unique,
is_reverse, merged_rows);
}
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs) {
RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& inputs) {
if (inputs.size() == 1) {
return *(inputs.begin());
return std::move(inputs[0]);
}
return new VUnionIterator(inputs);
return std::make_unique<VUnionIterator>(std::move(inputs));
}
RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, const Schema& schema) {
return new VStatisticsIterator(segment, schema);
}
RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows) {
return new VAutoIncrementIterator(schema, num_rows);
RowwiseIteratorUPtr new_auto_increment_iterator(const Schema& schema, size_t num_rows) {
return std::make_unique<VAutoIncrementIterator>(schema, num_rows);
}
} // namespace vectorized

View File

@ -65,14 +65,14 @@ private:
// }
class VMergeIteratorContext {
public:
VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique,
VMergeIteratorContext(RowwiseIteratorUPtr&& iter, int sequence_id_idx, bool is_unique,
bool is_reverse, std::vector<uint32_t>* read_orderby_key_columns)
: _iter(iter),
: _iter(std::move(iter)),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
_num_columns(iter->schema().num_column_ids()),
_num_key_columns(iter->schema().num_key_columns()),
_num_columns(_iter->schema().num_column_ids()),
_num_key_columns(_iter->schema().num_key_columns()),
_compare_columns(read_orderby_key_columns) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
@ -80,10 +80,7 @@ public:
VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
VMergeIteratorContext& operator=(VMergeIteratorContext&&) = delete;
~VMergeIteratorContext() {
delete _iter;
_iter = nullptr;
}
~VMergeIteratorContext() {}
Status block_reset(const std::shared_ptr<Block>& block);
@ -143,7 +140,7 @@ private:
// Load next block into _block
Status _load_next_block();
RowwiseIterator* _iter;
RowwiseIteratorUPtr _iter;
int _sequence_id_idx = -1;
bool _is_unique = false;
@ -171,9 +168,9 @@ private:
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
VMergeIterator(std::vector<RowwiseIterator*>& iters, int sequence_id_idx, bool is_unique,
VMergeIterator(std::vector<RowwiseIteratorUPtr>&& iters, int sequence_id_idx, bool is_unique,
bool is_reverse, uint64_t* merged_rows)
: _origin_iters(iters),
: _origin_iters(std::move(iters)),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
_is_reverse(is_reverse),
@ -204,7 +201,7 @@ public:
bool update_profile(RuntimeProfile* profile) override {
if (!_origin_iters.empty()) {
return (*_origin_iters.begin())->update_profile(profile);
return _origin_iters[0]->update_profile(profile);
}
return false;
}
@ -277,7 +274,7 @@ private:
}
// It will be released after '_merge_heap' has been built.
std::vector<RowwiseIterator*> _origin_iters;
std::vector<RowwiseIteratorUPtr> _origin_iters;
const Schema* _schema = nullptr;
@ -308,21 +305,21 @@ private:
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx,
bool is_unique, bool is_reverse, uint64_t* merged_rows);
RowwiseIteratorUPtr new_merge_iterator(std::vector<RowwiseIteratorUPtr>&& inputs,
int sequence_id_idx, bool is_unique, bool is_reverse,
uint64_t* merged_rows);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
//
// Inputs iterators' ownership is taken by created union iterator. And client
// should delete returned iterator after usage.
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs);
// Inputs iterators' ownership is taken by created union iterator.
RowwiseIteratorUPtr new_union_iterator(std::vector<RowwiseIteratorUPtr>&& inputs);
// Create an auto increment iterator which returns num_rows data in format of schema.
// This class aims to be used in unit test.
//
// Client should delete returned iterator.
RowwiseIterator* new_auto_increment_iterator(const Schema& schema, size_t num_rows);
RowwiseIteratorUPtr new_auto_increment_iterator(const Schema& schema, size_t num_rows);
RowwiseIterator* new_vstatistics_iterator(std::shared_ptr<Segment> segment, const Schema& schema);

View File

@ -86,19 +86,17 @@ TEST(VGenericIteratorsTest, AutoIncrement) {
EXPECT_EQ(row_count + 2, (*c2)[i].get<int>());
row_count++;
}
delete iter;
}
TEST(VGenericIteratorsTest, Union) {
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
auto iter = vectorized::new_union_iterator(inputs);
auto iter = vectorized::new_union_iterator(std::move(inputs));
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@ -117,34 +115,30 @@ TEST(VGenericIteratorsTest, Union) {
auto c1 = block.get_by_position(1).column;
auto c2 = block.get_by_position(2).column;
size_t row_count = 0;
for (size_t i = 0; i < block.rows(); ++i) {
size_t base_value = row_count;
if (row_count >= 300) {
for (int i = 0; i < block.rows(); ++i) {
size_t base_value = i;
if (i >= 500) {
base_value -= 500;
} else if (i >= 300) {
base_value -= 300;
} else if (row_count >= 100) {
base_value -= 100;
}
EXPECT_EQ(base_value, (*c0)[i].get<int>());
EXPECT_EQ(base_value + 1, (*c1)[i].get<int>());
EXPECT_EQ(base_value + 2, (*c2)[i].get<int>());
row_count++;
}
delete iter;
}
TEST(VGenericIteratorsTest, MergeAgg) {
EXPECT_TRUE(1);
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
auto iter = vectorized::new_merge_iterator(inputs, -1, false, false, nullptr);
auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, false, false, nullptr);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@ -180,20 +174,18 @@ TEST(VGenericIteratorsTest, MergeAgg) {
EXPECT_EQ(base_value + 2, (*c2)[i].get<int>());
row_count++;
}
delete iter;
}
TEST(VGenericIteratorsTest, MergeUnique) {
EXPECT_TRUE(1);
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
std::vector<RowwiseIteratorUPtr> inputs;
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
auto iter = vectorized::new_merge_iterator(inputs, -1, true, false, nullptr);
auto iter = vectorized::new_merge_iterator(std::move(inputs), -1, true, false, nullptr);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@ -221,8 +213,6 @@ TEST(VGenericIteratorsTest, MergeUnique) {
EXPECT_EQ(base_value + 2, (*c2)[i].get<int>());
row_count++;
}
delete iter;
}
// only used for Seq Column UT
@ -300,7 +290,7 @@ public:
TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
EXPECT_TRUE(1);
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
std::vector<RowwiseIteratorUPtr> inputs;
int seq_column_id = 2;
int seg_iter_num = 10;
@ -311,11 +301,12 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
// input seg file in Ascending, expect output seq column in Descending
for (int i = 0; i < seg_iter_num; i++) {
int seq_id_in_every_file = i;
inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin, seq_column_id,
seq_id_in_every_file));
inputs.push_back(std::make_unique<SeqColumnUtIterator>(
schema, num_rows, rows_begin, seq_column_id, seq_id_in_every_file));
}
auto iter = vectorized::new_merge_iterator(inputs, seq_column_id, true, false, nullptr);
auto iter =
vectorized::new_merge_iterator(std::move(inputs), seq_column_id, true, false, nullptr);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@ -335,8 +326,6 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
auto seq_col = block.get_by_position(seq_column_id).column;
size_t actual_value = (*seq_col)[0].get<int>();
EXPECT_EQ(seg_iter_num - 1, actual_value);
delete iter;
}
} // namespace vectorized