diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index 22446a03d1..9ef33459cc 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -89,18 +89,18 @@ void CollectIterator::build_heap(const std::vector& rs_re } Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, cumu_children.size() > 1, _reverse, - _reader->_sequence_col_idx, sort_type, sort_col_num); + _reader->_sequence_col_idx, &_reader->_merged_rows, sort_type, sort_col_num); cumu_iter->init(); _inner_iter.reset(new Level1Iterator(std::list{*base_reader_child, cumu_iter}, _merge, - _reverse, _reader->_sequence_col_idx, sort_type, sort_col_num)); + _reverse, _reader->_sequence_col_idx, &_reader->_merged_rows, sort_type, sort_col_num)); } else { // _children.size() == 1 _inner_iter.reset(new Level1Iterator(_children, _merge, - _reverse, _reader->_sequence_col_idx, sort_type, sort_col_num)); + _reverse, _reader->_sequence_col_idx, &_reader->_merged_rows, sort_type, sort_col_num)); } } else { _inner_iter.reset(new Level1Iterator(_children, _merge, - _reverse, _reader->_sequence_col_idx, sort_type, sort_col_num)); + _reverse, _reader->_sequence_col_idx, &_reader->_merged_rows, sort_type, sort_col_num)); } _inner_iter->init(); // Clear _children earlier to release any related references @@ -123,14 +123,20 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a auto seq_first_cell = first->cell(_sequence_id_idx); auto seq_second_cell = second->cell(_sequence_id_idx); auto res = first->schema()->column(_sequence_id_idx)->compare_cell(seq_first_cell, seq_second_cell); - if (res != 0) return res < 0; + if (res != 0) { + res < 0 ? a->set_need_skip(true) : b->set_need_skip(true); + return res < 0; + } + } // if row cursors equal, compare data version. // read data from higher version to lower version. // for UNIQUE_KEYS just read the highest version and no need agg_update. // for AGG_KEYS if a version is deleted, the lower version no need to agg_update if (_reverse) { - return a->version() < b->version(); + auto lower = a->version() < b->version(); + lower ? a->set_need_skip(true) : b->set_need_skip(true); + return lower; } return a->version() > b->version(); } @@ -265,9 +271,9 @@ OLAPStatus CollectIterator::Level0Iterator::next(const RowCursor** row, bool* de CollectIterator::Level1Iterator::Level1Iterator( const std::list& children, - bool merge, bool reverse, int sequence_id_idx, + bool merge, bool reverse, int sequence_id_idx, uint64_t* merge_count, SortType sort_type, int sort_col_num) - : _children(children), _merge(merge), _reverse(reverse), + : _children(children), _merge(merge), _reverse(reverse), _merged_rows(merge_count), _sort_type(sort_type), _sort_col_num(sort_col_num) {} CollectIterator::LevelIterator::~LevelIterator() = default; @@ -383,6 +389,12 @@ inline OLAPStatus CollectIterator::Level1Iterator::_merge_next(const RowCursor** LOG(WARNING) << "failed to get next from child, res=" << res; return res; } + + if (_cur_child->need_skip()) { + (*_merged_rows)++; + _cur_child->set_need_skip(false); + return _merge_next(row, delete_flag); + } *row = _cur_child->current_row(delete_flag); return OLAP_SUCCESS; } diff --git a/be/src/olap/collect_iterator.h b/be/src/olap/collect_iterator.h index 60b670efa9..858729cc36 100644 --- a/be/src/olap/collect_iterator.h +++ b/be/src/olap/collect_iterator.h @@ -67,6 +67,19 @@ private: virtual OLAPStatus next(const RowCursor** row, bool* delete_flag) = 0; virtual ~LevelIterator() = 0; + + bool need_skip() const { + return _skip_row; + } + + void set_need_skip(bool skip) const { + _skip_row = skip; + } + + // Only use in unique reader. Heap will set _skip_row = true. + // when build heap find the row in LevelIterator have same key but lower version or sequence + // the row of LevelIteratro should be skiped to prevent useless compare and function call + mutable bool _skip_row = false; }; // Compare row cursors between multiple merge elements, @@ -145,7 +158,7 @@ private: public: Level1Iterator(const std::list& children, bool merge, bool reverse, - int sequence_id_idx, SortType sort_type, int sort_col_num); + int sequence_id_idx, uint64_t* merge_count, SortType sort_type, int sort_col_num); OLAPStatus init() override; @@ -184,6 +197,8 @@ private: // used when `_merge == false` int _child_idx = 0; int _sequence_id_idx = -1; + + uint64_t* _merged_rows = nullptr; SortType _sort_type; int _sort_col_num; }; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 121d86eb7e..1e684692e4 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -843,6 +843,10 @@ OLAPStatus Reader::_init_delete_condition(const ReaderParams& read_params) { read_params.version.second, this); _tablet->release_header_lock(); + // Only BASE_COMPACTION need set filter_delete = true + // other reader type: + // QUERY will filter the row in query layer to keep right result use where clause. + // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset if (read_params.reader_type == READER_BASE_COMPACTION) { _filter_delete = true; } diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp index 37daa92689..430fa30ca8 100644 --- a/be/src/olap/tuple_reader.cpp +++ b/be/src/olap/tuple_reader.cpp @@ -184,7 +184,6 @@ OLAPStatus TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem ObjectPool* agg_pool, bool* eof) { *eof = false; bool cur_delete_flag = false; - int64_t merged_count = 0; do { if (UNLIKELY(_next_key == nullptr)) { *eof = true; @@ -196,23 +195,13 @@ OLAPStatus TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem // merge the lower versions direct_copy_row(row_cursor, *_next_key); // skip the lower version rows; - while (nullptr != _next_key) { - auto res = _collect_iter->next(&_next_key, &_next_delete_flag); - if (UNLIKELY(res == OLAP_ERR_DATA_EOF)) { - break; - } - + auto res = _collect_iter->next(&_next_key, &_next_delete_flag); + if (LIKELY(res != OLAP_ERR_DATA_EOF)) { if (UNLIKELY(res != OLAP_SUCCESS)) { LOG(WARNING) << "next failed: " << res; return res; } - - // break while can NOT doing aggregation - if (!equal_row(_key_cids, *row_cursor, *_next_key)) { - agg_finalize_row(_value_cids, row_cursor, mem_pool); - break; - } - ++merged_count; + agg_finalize_row(_value_cids, row_cursor, mem_pool); cur_delete_flag = _next_delete_flag; } @@ -223,7 +212,6 @@ OLAPStatus TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem } _stats.rows_del_filtered++; } while (cur_delete_flag); - _merged_rows += merged_count; return OLAP_SUCCESS; }