## Proposed changes pick #39306 <!--Describe your changes.-->
This commit is contained in:
@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) {
|
||||
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
|
||||
DCHECK(input_block->columns() == sorted_block.columns());
|
||||
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
|
||||
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
|
||||
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartitionSorter::prepare_for_read() {
|
||||
auto& cursors = _state->get_cursors();
|
||||
auto& blocks = _state->get_sorted_block();
|
||||
auto& priority_queue = _state->get_priority_queue();
|
||||
for (auto& block : blocks) {
|
||||
cursors.emplace_back(block, _sort_description);
|
||||
}
|
||||
for (auto& cursor : cursors) {
|
||||
priority_queue.push(MergeSortCursor(&cursor));
|
||||
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
|
||||
}
|
||||
blocks.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
|
||||
}
|
||||
|
||||
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
|
||||
if (_state->get_sorted_block().empty()) {
|
||||
if (_state->get_priority_queue().empty()) {
|
||||
*eos = true;
|
||||
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
|
||||
block->swap(*_state->get_priority_queue().top().impl->block);
|
||||
block->set_num_rows(_partition_inner_limit);
|
||||
*eos = true;
|
||||
} else {
|
||||
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
|
||||
auto& sorted_block = _state->get_sorted_block()[0];
|
||||
block->swap(sorted_block);
|
||||
block->set_num_rows(_partition_inner_limit);
|
||||
*eos = true;
|
||||
} else {
|
||||
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
|
||||
}
|
||||
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
|
||||
const auto& sorted_block = _state->get_sorted_block()[0];
|
||||
size_t num_columns = sorted_block.columns();
|
||||
auto& priority_queue = _state->get_priority_queue();
|
||||
if (priority_queue.empty()) {
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
}
|
||||
const auto& sorted_block = priority_queue.top().impl->block;
|
||||
size_t num_columns = sorted_block->columns();
|
||||
MutableBlock m_block =
|
||||
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
|
||||
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
|
||||
MutableColumns& merged_columns = m_block.mutable_columns();
|
||||
size_t current_output_rows = 0;
|
||||
auto& priority_queue = _state->get_priority_queue();
|
||||
|
||||
bool get_enough_data = false;
|
||||
while (!priority_queue.empty()) {
|
||||
@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
|
||||
//1 row_number no need to check distinct, just output partition_inner_limit row
|
||||
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
|
||||
}
|
||||
} else {
|
||||
//rows has get enough
|
||||
@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
|
||||
}
|
||||
}
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
|
||||
*_previous_row = current;
|
||||
}
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
|
||||
}
|
||||
current_output_rows++;
|
||||
break;
|
||||
|
||||
@ -50,7 +50,7 @@ public:
|
||||
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}
|
||||
|
||||
void reset() {
|
||||
impl = nullptr;
|
||||
impl->reset();
|
||||
row = 0;
|
||||
}
|
||||
bool compare_two_rows(const MergeSortCursor& rhs) const {
|
||||
@ -67,7 +67,7 @@ public:
|
||||
return true;
|
||||
}
|
||||
int row = 0;
|
||||
MergeSortCursorImpl* impl = nullptr;
|
||||
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
|
||||
};
|
||||
|
||||
class PartitionSorter final : public Sorter {
|
||||
|
||||
@ -59,48 +59,44 @@ namespace doris::vectorized {
|
||||
void MergeSorterState::reset() {
|
||||
auto empty_queue = std::priority_queue<MergeSortCursor>();
|
||||
priority_queue_.swap(empty_queue);
|
||||
std::vector<MergeSortCursorImpl> empty_cursors(0);
|
||||
cursors_.swap(empty_cursors);
|
||||
std::vector<Block> empty_blocks(0);
|
||||
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
|
||||
std::vector<std::shared_ptr<Block>> empty_blocks(0);
|
||||
sorted_blocks_.swap(empty_blocks);
|
||||
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
|
||||
in_mem_sorted_bocks_size_ = 0;
|
||||
}
|
||||
|
||||
Status MergeSorterState::add_sorted_block(Block& block) {
|
||||
auto rows = block.rows();
|
||||
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
|
||||
auto rows = block->rows();
|
||||
if (0 == rows) {
|
||||
return Status::OK();
|
||||
return;
|
||||
}
|
||||
in_mem_sorted_bocks_size_ += block.bytes();
|
||||
sorted_blocks_.emplace_back(std::move(block));
|
||||
in_mem_sorted_bocks_size_ += block->bytes();
|
||||
sorted_blocks_.emplace_back(block);
|
||||
num_rows_ += rows;
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
|
||||
for (auto& block : sorted_blocks_) {
|
||||
cursors_.emplace_back(block, sort_description);
|
||||
}
|
||||
|
||||
if (sorted_blocks_.size() > 1) {
|
||||
for (auto& cursor : cursors_) {
|
||||
priority_queue_.emplace(&cursor);
|
||||
}
|
||||
priority_queue_.emplace(
|
||||
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
|
||||
}
|
||||
|
||||
sorted_blocks_.clear();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
|
||||
bool* eos) {
|
||||
if (sorted_blocks_.empty()) {
|
||||
DCHECK(sorted_blocks_.empty());
|
||||
DCHECK(unsorted_block_->empty());
|
||||
if (priority_queue_.empty()) {
|
||||
*eos = true;
|
||||
} else if (sorted_blocks_.size() == 1) {
|
||||
} else if (priority_queue_.size() == 1) {
|
||||
if (offset_ != 0) {
|
||||
sorted_blocks_[0].skip_num_rows(offset_);
|
||||
priority_queue_.top().impl->block->skip_num_rows(offset_);
|
||||
}
|
||||
block->swap(sorted_blocks_[0]);
|
||||
block->swap(*priority_queue_.top().impl->block);
|
||||
*eos = true;
|
||||
} else {
|
||||
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
|
||||
@ -110,9 +106,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
|
||||
|
||||
Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
|
||||
bool* eos) {
|
||||
size_t num_columns = sorted_blocks_[0].columns();
|
||||
if (priority_queue_.empty()) {
|
||||
*eos = true;
|
||||
return Status::OK();
|
||||
}
|
||||
size_t num_columns = priority_queue_.top().impl->block->columns();
|
||||
|
||||
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
|
||||
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
|
||||
block, *priority_queue_.top().impl->block);
|
||||
MutableColumns& merged_columns = m_block.mutable_columns();
|
||||
|
||||
/// Take rows from queue in right order and push to 'merged'.
|
||||
@ -123,7 +124,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
|
||||
|
||||
if (offset_ == 0) {
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
|
||||
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
|
||||
++merged_rows;
|
||||
} else {
|
||||
offset_--;
|
||||
@ -134,7 +135,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
|
||||
priority_queue_.push(current);
|
||||
}
|
||||
|
||||
if (merged_rows == batch_size) break;
|
||||
if (merged_rows == batch_size) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
block->set_columns(std::move(merged_columns));
|
||||
|
||||
@ -261,22 +264,22 @@ Status FullSorter::_do_sort() {
|
||||
// if one block totally greater the heap top of _block_priority_queue
|
||||
// we can throw the block data directly.
|
||||
if (_state->num_rows() < _offset + _limit) {
|
||||
static_cast<void>(_state->add_sorted_block(desc_block));
|
||||
_block_priority_queue.emplace(_pool->add(
|
||||
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
|
||||
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
|
||||
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
|
||||
_state->last_sorted_block(), _sort_description));
|
||||
} else {
|
||||
auto tmp_cursor_impl =
|
||||
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
|
||||
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
|
||||
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
|
||||
Block::create_shared(std::move(desc_block)), _sort_description);
|
||||
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
|
||||
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
|
||||
static_cast<void>(_state->add_sorted_block(desc_block));
|
||||
_block_priority_queue.emplace(_pool->add(
|
||||
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
|
||||
_state->add_sorted_block(tmp_cursor_impl->block);
|
||||
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
|
||||
_state->last_sorted_block(), _sort_description));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// dispose normal sort logic
|
||||
static_cast<void>(_state->add_sorted_block(desc_block));
|
||||
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -59,7 +59,7 @@ public:
|
||||
|
||||
~MergeSorterState() = default;
|
||||
|
||||
Status add_sorted_block(Block& block);
|
||||
void add_sorted_block(std::shared_ptr<Block> block);
|
||||
|
||||
Status build_merge_tree(const SortDescription& sort_description);
|
||||
|
||||
@ -72,23 +72,19 @@ public:
|
||||
|
||||
uint64_t num_rows() const { return num_rows_; }
|
||||
|
||||
Block& last_sorted_block() { return sorted_blocks_.back(); }
|
||||
std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
|
||||
|
||||
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
|
||||
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
|
||||
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
|
||||
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
|
||||
void reset();
|
||||
|
||||
std::unique_ptr<Block> unsorted_block_;
|
||||
|
||||
private:
|
||||
int _calc_spill_blocks_to_merge() const;
|
||||
|
||||
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);
|
||||
|
||||
std::priority_queue<MergeSortCursor> priority_queue_;
|
||||
std::vector<MergeSortCursorImpl> cursors_;
|
||||
std::vector<Block> sorted_blocks_;
|
||||
std::vector<std::shared_ptr<Block>> sorted_blocks_;
|
||||
size_t in_mem_sorted_bocks_size_ = 0;
|
||||
uint64_t num_rows_ = 0;
|
||||
|
||||
|
||||
@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) {
|
||||
// if one block totally greater the heap top of _block_priority_queue
|
||||
// we can throw the block data directly.
|
||||
if (_state->num_rows() < _offset + _limit) {
|
||||
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
|
||||
_block_priority_queue.emplace(_pool->add(
|
||||
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
|
||||
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
|
||||
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
|
||||
_state->last_sorted_block(), _sort_description));
|
||||
} else {
|
||||
auto tmp_cursor_impl =
|
||||
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
|
||||
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
|
||||
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
|
||||
Block::create_shared(std::move(sorted_block)), _sort_description);
|
||||
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
|
||||
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
|
||||
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
|
||||
_block_priority_queue.emplace(_pool->add(
|
||||
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
|
||||
_state->add_sorted_block(block_cursor.impl->block);
|
||||
_block_priority_queue.emplace(tmp_cursor_impl);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -120,7 +120,8 @@ private:
|
||||
* It is used in priority queue.
|
||||
*/
|
||||
struct MergeSortCursorImpl {
|
||||
ColumnRawPtrs all_columns;
|
||||
ENABLE_FACTORY_CREATOR(MergeSortCursorImpl);
|
||||
std::shared_ptr<Block> block;
|
||||
ColumnRawPtrs sort_columns;
|
||||
SortDescription desc;
|
||||
size_t sort_columns_size = 0;
|
||||
@ -130,37 +131,30 @@ struct MergeSortCursorImpl {
|
||||
MergeSortCursorImpl() = default;
|
||||
virtual ~MergeSortCursorImpl() = default;
|
||||
|
||||
MergeSortCursorImpl(Block& block, const SortDescription& desc_)
|
||||
: desc(desc_), sort_columns_size(desc.size()) {
|
||||
reset(block);
|
||||
MergeSortCursorImpl(std::shared_ptr<Block> block_, const SortDescription& desc_)
|
||||
: block(block_), desc(desc_), sort_columns_size(desc.size()) {
|
||||
reset();
|
||||
}
|
||||
|
||||
MergeSortCursorImpl(const SortDescription& desc_)
|
||||
: desc(desc_), sort_columns_size(desc.size()) {}
|
||||
: block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {}
|
||||
bool empty() const { return rows == 0; }
|
||||
|
||||
/// Set the cursor to the beginning of the new block.
|
||||
void reset(Block& block) {
|
||||
all_columns.clear();
|
||||
void reset() {
|
||||
sort_columns.clear();
|
||||
|
||||
auto columns = block.get_columns_and_convert();
|
||||
size_t num_columns = columns.size();
|
||||
|
||||
for (size_t j = 0; j < num_columns; ++j) {
|
||||
all_columns.push_back(columns[j].get());
|
||||
}
|
||||
|
||||
auto columns = block->get_columns_and_convert();
|
||||
for (size_t j = 0, size = desc.size(); j < size; ++j) {
|
||||
auto& column_desc = desc[j];
|
||||
size_t column_number = !column_desc.column_name.empty()
|
||||
? block.get_position_by_name(column_desc.column_name)
|
||||
? block->get_position_by_name(column_desc.column_name)
|
||||
: column_desc.column_number;
|
||||
sort_columns.push_back(columns[column_number].get());
|
||||
}
|
||||
|
||||
pos = 0;
|
||||
rows = all_columns[0]->size();
|
||||
rows = block->rows();
|
||||
}
|
||||
|
||||
bool is_first() const { return pos == 0; }
|
||||
@ -174,11 +168,13 @@ struct MergeSortCursorImpl {
|
||||
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
|
||||
|
||||
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
|
||||
ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl);
|
||||
BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
|
||||
const VExprContextSPtrs& ordering_expr,
|
||||
const std::vector<bool>& is_asc_order,
|
||||
const std::vector<bool>& nulls_first)
|
||||
: _ordering_expr(ordering_expr), _block_supplier(block_supplier) {
|
||||
block = Block::create_shared();
|
||||
sort_columns_size = ordering_expr.size();
|
||||
|
||||
desc.resize(ordering_expr.size());
|
||||
@ -195,21 +191,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
|
||||
}
|
||||
|
||||
bool has_next_block() override {
|
||||
_block.clear();
|
||||
block->clear();
|
||||
Status status;
|
||||
do {
|
||||
status = _block_supplier(&_block, &_is_eof);
|
||||
} while (_block.empty() && !_is_eof && status.ok());
|
||||
status = _block_supplier(block.get(), &_is_eof);
|
||||
} while (block->empty() && !_is_eof && status.ok());
|
||||
// If status not ok, upper callers could not detect whether it is eof or error.
|
||||
// So that fatal here, and should throw exception in the future.
|
||||
if (status.ok() && !_block.empty()) {
|
||||
if (status.ok() && !block->empty()) {
|
||||
if (_ordering_expr.size() > 0) {
|
||||
for (int i = 0; status.ok() && i < desc.size(); ++i) {
|
||||
// TODO yiguolei: throw exception if status not ok in the future
|
||||
status = _ordering_expr[i]->execute(&_block, &desc[i].column_number);
|
||||
status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number);
|
||||
}
|
||||
}
|
||||
MergeSortCursorImpl::reset(_block);
|
||||
MergeSortCursorImpl::reset();
|
||||
return status.ok();
|
||||
} else if (!status.ok()) {
|
||||
throw std::runtime_error(status.msg());
|
||||
@ -221,32 +217,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
|
||||
if (_is_eof) {
|
||||
return nullptr;
|
||||
}
|
||||
return &_block;
|
||||
}
|
||||
|
||||
size_t columns_num() const { return all_columns.size(); }
|
||||
|
||||
Block create_empty_blocks() const {
|
||||
size_t num_columns = columns_num();
|
||||
MutableColumns columns(num_columns);
|
||||
for (size_t i = 0; i < num_columns; ++i) {
|
||||
columns[i] = all_columns[i]->clone_empty();
|
||||
}
|
||||
return _block.clone_with_columns(std::move(columns));
|
||||
return block.get();
|
||||
}
|
||||
|
||||
VExprContextSPtrs _ordering_expr;
|
||||
Block _block;
|
||||
BlockSupplier _block_supplier {};
|
||||
bool _is_eof = false;
|
||||
};
|
||||
|
||||
/// For easy copying.
|
||||
struct MergeSortCursor {
|
||||
MergeSortCursorImpl* impl;
|
||||
ENABLE_FACTORY_CREATOR(MergeSortCursor);
|
||||
std::shared_ptr<MergeSortCursorImpl> impl;
|
||||
|
||||
MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
|
||||
MergeSortCursorImpl* operator->() const { return impl; }
|
||||
MergeSortCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) {}
|
||||
MergeSortCursorImpl* operator->() const { return impl.get(); }
|
||||
|
||||
/// The specified row of this cursor is greater than the specified row of another cursor.
|
||||
int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const {
|
||||
@ -286,10 +271,11 @@ struct MergeSortCursor {
|
||||
|
||||
/// For easy copying.
|
||||
struct MergeSortBlockCursor {
|
||||
MergeSortCursorImpl* impl = nullptr;
|
||||
ENABLE_FACTORY_CREATOR(MergeSortBlockCursor);
|
||||
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
|
||||
|
||||
MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
|
||||
MergeSortCursorImpl* operator->() const { return impl; }
|
||||
MergeSortBlockCursor(std::shared_ptr<MergeSortCursorImpl> impl_) : impl(impl_) {}
|
||||
MergeSortCursorImpl* operator->() const { return impl.get(); }
|
||||
|
||||
/// The specified row of this cursor is greater than the specified row of another cursor.
|
||||
int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
|
||||
|
||||
@ -28,14 +28,6 @@
|
||||
#include "vec/core/column_with_type_and_name.h"
|
||||
#include "vec/utils/util.hpp"
|
||||
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
class VExprContext;
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
|
||||
using std::vector;
|
||||
|
||||
namespace doris::vectorized {
|
||||
|
||||
VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr,
|
||||
@ -68,13 +60,14 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) {
|
||||
_get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock");
|
||||
}
|
||||
|
||||
Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
|
||||
Status VSortedRunMerger::prepare(const std::vector<BlockSupplier>& input_runs) {
|
||||
try {
|
||||
for (const auto& supplier : input_runs) {
|
||||
if (_use_sort_desc) {
|
||||
_cursors.emplace_back(supplier, _desc);
|
||||
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, _desc));
|
||||
} else {
|
||||
_cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first);
|
||||
_cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(
|
||||
supplier, _ordering_expr, _is_asc_order, _nulls_first));
|
||||
}
|
||||
}
|
||||
} catch (const std::exception& e) {
|
||||
@ -82,15 +75,8 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>& input_runs) {
|
||||
}
|
||||
|
||||
for (auto& _cursor : _cursors) {
|
||||
if (!_cursor._is_eof) {
|
||||
_priority_queue.push(MergeSortCursor(&_cursor));
|
||||
}
|
||||
}
|
||||
|
||||
for (const auto& cursor : _cursors) {
|
||||
if (!cursor._is_eof) {
|
||||
_empty_block = cursor.create_empty_blocks();
|
||||
break;
|
||||
if (!_cursor->_is_eof) {
|
||||
_priority_queue.push(MergeSortCursor(_cursor));
|
||||
}
|
||||
}
|
||||
|
||||
@ -145,7 +131,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
|
||||
}
|
||||
} else {
|
||||
if (current->block_ptr() != nullptr) {
|
||||
for (int i = 0; i < current->all_columns.size(); i++) {
|
||||
for (int i = 0; i < current->block->columns(); i++) {
|
||||
auto& column_with_type = current->block_ptr()->get_by_position(i);
|
||||
column_with_type.column = column_with_type.column->cut(
|
||||
current->pos, current->rows - current->pos);
|
||||
@ -162,9 +148,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
size_t num_columns = _empty_block.columns();
|
||||
MutableBlock m_block =
|
||||
VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block);
|
||||
size_t num_columns = _priority_queue.top().impl->block->columns();
|
||||
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
|
||||
output_block, *_priority_queue.top().impl->block);
|
||||
MutableColumns& merged_columns = m_block.mutable_columns();
|
||||
|
||||
if (num_columns != merged_columns.size()) {
|
||||
|
||||
@ -30,9 +30,7 @@
|
||||
#include "vec/core/sort_description.h"
|
||||
#include "vec/exprs/vexpr_fwd.h"
|
||||
|
||||
namespace doris {
|
||||
|
||||
namespace vectorized {
|
||||
namespace doris::vectorized {
|
||||
|
||||
// VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted
|
||||
// sequence of blocks, which are fetched from a BlockSupplier function object.
|
||||
@ -78,14 +76,12 @@ protected:
|
||||
|
||||
bool _pipeline_engine_enabled = false;
|
||||
|
||||
std::vector<BlockSupplierSortCursorImpl> _cursors;
|
||||
std::vector<std::shared_ptr<BlockSupplierSortCursorImpl>> _cursors;
|
||||
std::priority_queue<MergeSortCursor> _priority_queue;
|
||||
|
||||
/// In pipeline engine, if a cursor needs to read one more block from supplier,
|
||||
/// we make it as a pending cursor until the supplier is readable.
|
||||
MergeSortCursorImpl* _pending_cursor = nullptr;
|
||||
|
||||
Block _empty_block;
|
||||
std::shared_ptr<MergeSortCursorImpl> _pending_cursor = nullptr;
|
||||
|
||||
// Times calls to get_next().
|
||||
RuntimeProfile::Counter* _get_next_timer = nullptr;
|
||||
@ -105,5 +101,4 @@ private:
|
||||
bool has_next_block(MergeSortCursor& current);
|
||||
};
|
||||
|
||||
} // namespace vectorized
|
||||
} // namespace doris
|
||||
} // namespace doris::vectorized
|
||||
|
||||
Reference in New Issue
Block a user