[refactor](heap sort) Simplify sorted block view (#33477)
This commit is contained in:
@ -84,18 +84,18 @@ Status HeapSorter::append_block(Block* block) {
|
||||
Block tmp_block = block->clone_empty();
|
||||
tmp_block.swap(*block);
|
||||
size_t num_rows = tmp_block.rows();
|
||||
HeapSortCursorBlockView block_view_val(std::move(tmp_block), _sort_description);
|
||||
auto* block_view = new SharedHeapSortCursorBlockView(std::move(block_view_val));
|
||||
block_view->ref();
|
||||
Defer defer([&] { block_view->unref(); });
|
||||
auto block_view =
|
||||
std::make_shared<HeapSortCursorBlockView>(std::move(tmp_block), _sort_description);
|
||||
bool filtered = false;
|
||||
if (_heap_size == _heap->size()) {
|
||||
{
|
||||
SCOPED_TIMER(_topn_filter_timer);
|
||||
_do_filter(block_view->value(), num_rows);
|
||||
_do_filter(*block_view, num_rows);
|
||||
}
|
||||
size_t remain_rows = block_view->value().block.rows();
|
||||
size_t remain_rows = block_view->block.rows();
|
||||
_topn_filter_rows += (num_rows - remain_rows);
|
||||
COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows);
|
||||
filtered = remain_rows == 0;
|
||||
for (size_t i = 0; i < remain_rows; ++i) {
|
||||
HeapSortCursorImpl cursor(i, block_view);
|
||||
_heap->replace_top_if_less(std::move(cursor));
|
||||
@ -114,8 +114,8 @@ Status HeapSorter::append_block(Block* block) {
|
||||
_heap->replace_top_if_less(std::move(cursor));
|
||||
}
|
||||
}
|
||||
if (block_view->ref_count() > 1) {
|
||||
_data_size += block_view->value().block.allocated_bytes();
|
||||
if (!filtered) {
|
||||
_data_size += block_view->block.allocated_bytes();
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -58,43 +58,16 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
// Use `SharedHeapSortCursorBlockView` for `HeapSortCursorBlockView` instead of shared_ptr because there will be no
|
||||
// concurrent operation for `HeapSortCursorBlockView` and we don't need the lock inside shared_ptr
|
||||
class SharedHeapSortCursorBlockView {
|
||||
public:
|
||||
SharedHeapSortCursorBlockView(HeapSortCursorBlockView&& reference)
|
||||
: _ref_count(0), _reference(std::move(reference)) {}
|
||||
SharedHeapSortCursorBlockView(const SharedHeapSortCursorBlockView&) = delete;
|
||||
void unref() noexcept {
|
||||
DCHECK_GT(_ref_count, 0);
|
||||
_ref_count--;
|
||||
if (_ref_count == 0) {
|
||||
delete this;
|
||||
}
|
||||
}
|
||||
void ref() noexcept { _ref_count++; }
|
||||
|
||||
HeapSortCursorBlockView& value() { return _reference; }
|
||||
|
||||
int ref_count() const { return _ref_count; }
|
||||
|
||||
private:
|
||||
~SharedHeapSortCursorBlockView() noexcept = default;
|
||||
int _ref_count;
|
||||
HeapSortCursorBlockView _reference;
|
||||
};
|
||||
using HeapSortCursorBlockSPtr = std::shared_ptr<HeapSortCursorBlockView>;
|
||||
|
||||
struct HeapSortCursorImpl {
|
||||
public:
|
||||
HeapSortCursorImpl(int row_id, SharedHeapSortCursorBlockView* block_view)
|
||||
: _row_id(row_id), _block_view(block_view) {
|
||||
block_view->ref();
|
||||
}
|
||||
HeapSortCursorImpl(int row_id, HeapSortCursorBlockSPtr block_view)
|
||||
: _row_id(row_id), _block_view(block_view) {}
|
||||
|
||||
HeapSortCursorImpl(const HeapSortCursorImpl& other) {
|
||||
_row_id = other._row_id;
|
||||
_block_view = other._block_view;
|
||||
_block_view->ref();
|
||||
}
|
||||
|
||||
HeapSortCursorImpl(HeapSortCursorImpl&& other) {
|
||||
@ -109,19 +82,15 @@ public:
|
||||
return *this;
|
||||
}
|
||||
|
||||
~HeapSortCursorImpl() {
|
||||
if (_block_view) {
|
||||
_block_view->unref();
|
||||
}
|
||||
}
|
||||
~HeapSortCursorImpl() = default;
|
||||
|
||||
size_t row_id() const { return _row_id; }
|
||||
|
||||
const ColumnRawPtrs& sort_columns() const { return _block_view->value().sort_columns; }
|
||||
const ColumnRawPtrs& sort_columns() const { return _block_view->sort_columns; }
|
||||
|
||||
const Block* block() const { return &_block_view->value().block; }
|
||||
const Block* block() const { return &_block_view->block; }
|
||||
|
||||
const SortDescription& sort_desc() const { return _block_view->value().desc; }
|
||||
const SortDescription& sort_desc() const { return _block_view->desc; }
|
||||
|
||||
bool operator<(const HeapSortCursorImpl& rhs) const {
|
||||
for (size_t i = 0; i < sort_desc().size(); ++i) {
|
||||
@ -143,7 +112,7 @@ public:
|
||||
|
||||
private:
|
||||
size_t _row_id;
|
||||
SharedHeapSortCursorBlockView* _block_view;
|
||||
HeapSortCursorBlockSPtr _block_view;
|
||||
};
|
||||
|
||||
/** Cursor allows to compare rows in different blocks (and parts).
|
||||
|
||||
Reference in New Issue
Block a user