[refacotr](node) refactor partition sort node to improve readability (#30511)

* [refacotr](node) refactor partition sort node to improve readability

* update
This commit is contained in:
zhangstar333
2024-02-01 18:58:02 +08:00
committed by yiguolei
parent fd2d9ae63e
commit 65e277e365
8 changed files with 117 additions and 112 deletions

View File

@ -47,7 +47,7 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
_partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
p._top_n_algorithm, _shared_state->previous_row.get(), p._topn_phase);
p._top_n_algorithm, p._topn_phase);
_init_hash_method();
return Status::OK();
}
@ -122,8 +122,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._dependency->set_ready_to_read();
}
} else {
RETURN_IF_ERROR(
_split_block_by_partition(input_block, state->batch_size(), local_state));
RETURN_IF_ERROR(_split_block_by_partition(input_block, local_state,
source_state == SourceState::FINISHED));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
}
@ -135,19 +135,16 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
local_state._agg_arena_pool.reset(nullptr);
local_state._partitioned_data.reset(nullptr);
for (int i = 0; i < local_state._value_places.size(); ++i) {
auto sorter = vectorized::PartitionSorter::create_unique(
_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
_child_x->row_desc(), state, i == 0 ? local_state._profile : nullptr,
_has_global_limit, _partition_inner_limit, _top_n_algorithm,
local_state._shared_state->previous_row.get());
local_state._value_places[i]->create_or_reset_sorter_state();
auto sorter = std::move(local_state._value_places[i]->_partition_topn_sorter);
DCHECK(_child_x->row_desc().num_materialized_slots() ==
local_state._value_places[i]->blocks.back()->columns());
local_state._value_places[i]->_blocks.back()->columns());
//get blocks from every partition, and sorter get those data.
for (const auto& block : local_state._value_places[i]->blocks) {
for (const auto& block : local_state._value_places[i]->_blocks) {
RETURN_IF_ERROR(sorter->append_block(block.get()));
}
sorter->init_profile(local_state._profile);
local_state._value_places[i]->_blocks.clear();
RETURN_IF_ERROR(sorter->prepare_for_read());
local_state._shared_state->partition_sorts.push_back(std::move(sorter));
}
@ -165,7 +162,7 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
}
Status PartitionSortSinkOperatorX::_split_block_by_partition(
vectorized::Block* input_block, int batch_size, PartitionSortSinkLocalState& local_state) {
vectorized::Block* input_block, PartitionSortSinkLocalState& local_state, bool eos) {
for (int i = 0; i < _partition_exprs_num; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
@ -173,15 +170,16 @@ Status PartitionSortSinkOperatorX::_split_block_by_partition(
local_state._partition_columns[i] =
input_block->get_by_position(result_column_id).column.get();
}
_emplace_into_hash_table(local_state._partition_columns, input_block, batch_size, local_state);
RETURN_IF_ERROR(_emplace_into_hash_table(local_state._partition_columns, input_block,
local_state, eos));
return Status::OK();
}
void PartitionSortSinkOperatorX::_emplace_into_hash_table(
Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block* input_block,
int batch_size, PartitionSortSinkLocalState& local_state) {
std::visit(
[&](auto&& agg_method) -> void {
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
[&](auto&& agg_method) -> Status {
SCOPED_TIMER(local_state._build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
@ -213,10 +211,9 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
}
for (auto* place : local_state._value_places) {
SCOPED_TIMER(local_state._selector_block_timer);
place->append_block_by_selector(input_block, _child_x->row_desc(),
_has_global_limit, _partition_inner_limit,
batch_size);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
},
local_state._partitioned_data->method_variant);
}

View File

@ -131,11 +131,11 @@ private:
bool _has_global_limit = false;
int64_t _partition_inner_limit = 0;
Status _split_block_by_partition(vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
void _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, int batch_size,
PartitionSortSinkLocalState& local_state);
Status _split_block_by_partition(vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos);
Status _emplace_into_hash_table(const vectorized::ColumnRawPtrs& key_columns,
const vectorized::Block* input_block,
PartitionSortSinkLocalState& local_state, bool eos);
};
} // namespace pipeline

View File

@ -34,7 +34,6 @@ Status PartitionSortSourceLocalState::init(RuntimeState* state, LocalStateInfo&
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
_get_sorted_timer = ADD_TIMER(profile(), "GetSortedTime");
_shared_state->previous_row = std::make_unique<vectorized::SortCursorCmp>();
return Status::OK();
}
@ -95,7 +94,6 @@ Status PartitionSortSourceOperatorX::get_sorted_block(RuntimeState* state,
}
if (current_eos) {
//current sort have eos, so get next idx
local_state._shared_state->previous_row->reset();
auto rows = local_state._shared_state->partition_sorts[local_state._sort_idx]
->get_output_rows();
local_state._num_rows_returned += rows;

View File

@ -479,7 +479,6 @@ public:
std::queue<vectorized::Block> blocks_buffer;
std::mutex buffer_mutex;
std::vector<std::unique_ptr<vectorized::PartitionSorter>> partition_sorts;
std::unique_ptr<vectorized::SortCursorCmp> previous_row;
bool sink_eos = false;
std::mutex sink_eos_lock;
};

View File

@ -75,6 +75,14 @@ Status PartitionSorter::prepare_for_read() {
return Status::OK();
}
// have done sorter and get topn records, so could reset those state to init
void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
std::priority_queue<MergeSortBlockCursor> empty_queue;
std::swap(_block_priority_queue, empty_queue);
_state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr);
_previous_row->reset();
}
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
if (_state->get_sorted_block().empty()) {
*eos = true;

View File

@ -94,6 +94,7 @@ public:
Status partition_sort_read(Block* block, bool* eos, int batch_size);
int64 get_output_rows() const { return _output_total_rows; }
void reset_sorter_state(RuntimeState* runtime_state);
private:
std::unique_ptr<MergeSorterState> _state;

View File

@ -28,6 +28,7 @@
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/runtime_state.h"
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
@ -38,23 +39,59 @@
#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
Status PartitionBlocks::do_partition_topn_sort() {
Status PartitionBlocks::append_block_by_selector(const vectorized::Block* input_block, bool eos) {
if (_blocks.empty() || reach_limit()) {
_init_rows = _partition_sort_info->_runtime_state->batch_size();
_blocks.push_back(Block::create_unique(
VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc)));
}
auto columns = input_block->get_columns();
auto mutable_columns = _blocks.back()->mutate_columns();
DCHECK(columns.size() == mutable_columns.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
columns[i]->append_data_by_selector(mutable_columns[i], _selector);
}
_blocks.back()->set_columns(std::move(mutable_columns));
auto selector_rows = _selector.size();
_init_rows = _init_rows - selector_rows;
_total_rows = _total_rows + selector_rows;
_current_input_rows = _current_input_rows + selector_rows;
_selector.clear();
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
if (!eos && _partition_sort_info->_partition_inner_limit != -1 &&
_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
_partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) {
create_or_reset_sorter_state();
RETURN_IF_ERROR(do_partition_topn_sort());
_current_input_rows = 0; // reset record
_do_partition_topn_count++;
}
return Status::OK();
}
void PartitionBlocks::create_or_reset_sorter_state() {
if (_partition_topn_sorter == nullptr) {
_previous_row = std::make_unique<SortCursorCmp>();
_partition_topn_sorter = PartitionSorter::create_unique(
*_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit,
_partition_sort_info->_offset, _partition_sort_info->_pool,
_partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first,
_partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, nullptr,
_partition_sort_info->_row_desc, _partition_sort_info->_runtime_state,
_is_first_sorter ? _partition_sort_info->_runtime_profile : nullptr,
_partition_sort_info->_has_global_limit,
_partition_sort_info->_partition_inner_limit,
_partition_sort_info->_top_n_algorithm, _partition_sort_info->_previous_row);
_partition_sort_info->_top_n_algorithm, _previous_row.get());
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
} else {
_partition_topn_sorter->reset_sorter_state(_partition_sort_info->_runtime_state);
}
}
for (const auto& block : blocks) {
Status PartitionBlocks::do_partition_topn_sort() {
for (const auto& block : _blocks) {
RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
}
blocks.clear();
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
_blocks.clear();
RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
bool current_eos = false;
size_t current_output_rows = 0;
@ -67,14 +104,11 @@ Status PartitionBlocks::do_partition_topn_sort() {
auto rows = output_block->rows();
if (rows > 0) {
current_output_rows += rows;
blocks.emplace_back(std::move(output_block));
_blocks.emplace_back(std::move(output_block));
}
}
_topn_filter_rows += (_current_input_rows - current_output_rows);
_partition_sort_info->_previous_row->reset();
_partition_topn_sorter.reset(nullptr);
return Status::OK();
}
@ -83,7 +117,6 @@ VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode,
: ExecNode(pool, tnode, descs), _hash_table_size_counter(nullptr) {
_partitioned_data = std::make_unique<PartitionedHashMapVariants>();
_agg_arena_pool = std::make_unique<Arena>();
_previous_row = std::make_unique<SortCursorCmp>();
}
Status VPartitionSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
@ -130,27 +163,26 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
_partition_sort_info = std::make_shared<PartitionSortInfo>(
&_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, child(0)->row_desc(),
state, _runtime_profile.get(), _has_global_limit, _partition_inner_limit,
_top_n_algorithm, _previous_row.get(), _topn_phase);
_top_n_algorithm, _topn_phase);
return Status::OK();
}
Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block,
int batch_size) {
Status VPartitionSortNode::_split_block_by_partition(vectorized::Block* input_block, bool eos) {
for (int i = 0; i < _partition_exprs_num; ++i) {
int result_column_id = -1;
RETURN_IF_ERROR(_partition_expr_ctxs[i]->execute(input_block, &result_column_id));
DCHECK(result_column_id != -1);
_partition_columns[i] = input_block->get_by_position(result_column_id).column.get();
}
_emplace_into_hash_table(_partition_columns, input_block, batch_size);
RETURN_IF_ERROR(_emplace_into_hash_table(_partition_columns, input_block, eos));
return Status::OK();
}
void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns,
const vectorized::Block* input_block,
int batch_size) {
std::visit(
[&](auto&& agg_method) -> void {
Status VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_columns,
const vectorized::Block* input_block,
bool eos) {
return std::visit(
[&](auto&& agg_method) -> Status {
SCOPED_TIMER(_build_timer);
using HashMethodType = std::decay_t<decltype(agg_method)>;
using AggState = typename HashMethodType::State;
@ -183,10 +215,9 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum
SCOPED_TIMER(_selector_block_timer);
for (auto* place : _value_places) {
place->append_block_by_selector(input_block, child(0)->row_desc(),
_has_global_limit, _partition_inner_limit,
batch_size);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
return Status::OK();
},
_partitioned_data->method_variant);
}
@ -214,7 +245,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
_blocks_buffer.push(std::move(*input_block));
}
} else {
RETURN_IF_ERROR(_split_block_by_partition(input_block, state->batch_size()));
RETURN_IF_ERROR(_split_block_by_partition(input_block, eos));
RETURN_IF_CANCELLED(state);
input_block->clear_column_data();
}
@ -227,21 +258,18 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
_partitioned_data.reset(nullptr);
SCOPED_TIMER(_partition_sort_timer);
for (int i = 0; i < _value_places.size(); ++i) {
auto sorter = PartitionSorter::create_unique(
_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first,
child(0)->row_desc(), state, i == 0 ? _runtime_profile.get() : nullptr,
_has_global_limit, _partition_inner_limit, _top_n_algorithm,
_previous_row.get());
_value_places[i]->create_or_reset_sorter_state();
auto sorter = std::ref(_value_places[i]->_partition_topn_sorter);
DCHECK(child(0)->row_desc().num_materialized_slots() ==
_value_places[i]->blocks.back()->columns());
_value_places[i]->_blocks.back()->columns());
//get blocks from every partition, and sorter get those data.
for (const auto& block : _value_places[i]->blocks) {
RETURN_IF_ERROR(sorter->append_block(block.get()));
for (const auto& block : _value_places[i]->_blocks) {
RETURN_IF_ERROR(sorter.get()->append_block(block.get()));
}
sorter->init_profile(_runtime_profile.get());
RETURN_IF_ERROR(sorter->prepare_for_read());
_partition_sorts.push_back(std::move(sorter));
_value_places[i]->_blocks.clear();
RETURN_IF_ERROR(sorter.get()->prepare_for_read());
_partition_sorts.push_back(std::move(sorter.get()));
}
COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
@ -269,7 +297,7 @@ Status VPartitionSortNode::open(RuntimeState* state) {
RETURN_IF_ERROR(sink(state, input_block.get(), eos));
} while (!eos);
static_cast<void>(child(0)->close(state));
RETURN_IF_ERROR(child(0)->close(state));
return Status::OK();
}
@ -342,7 +370,6 @@ Status VPartitionSortNode::get_sorted_block(RuntimeState* state, Block* output_b
}
if (*current_eos) {
//current sort have eos, so get next idx
_previous_row->reset();
auto rows = _partition_sorts[_sort_idx]->get_output_rows();
partition_profile_output_rows.push_back(rows);
_num_rows_returned += rows;
@ -436,22 +463,22 @@ void VPartitionSortNode::_init_hash_method() {
}
void VPartitionSortNode::debug_profile() {
fmt::memory_buffer partition_rows_read, partition_blocks_read, partition_filter_rows;
fmt::memory_buffer partition_rows_read, partition_topn_count, partition_filter_rows;
fmt::format_to(partition_rows_read, "[");
fmt::format_to(partition_blocks_read, "[");
fmt::format_to(partition_topn_count, "[");
fmt::format_to(partition_filter_rows, "[");
for (auto* place : _value_places) {
fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
fmt::format_to(partition_filter_rows, "{}, ", place->get_topn_filter_rows());
fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size());
fmt::format_to(partition_topn_count, "{}, ", place->get_do_topn_count());
}
fmt::format_to(partition_rows_read, "]");
fmt::format_to(partition_blocks_read, "]");
fmt::format_to(partition_topn_count, "]");
fmt::format_to(partition_filter_rows, "]");
runtime_profile()->add_info_string("PerPartitionBlocksRead",
fmt::to_string(partition_blocks_read));
runtime_profile()->add_info_string("PerPartitionDoTopNCount",
fmt::to_string(partition_topn_count));
runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read));
runtime_profile()->add_info_string("PerPartitionFilterRows",
fmt::to_string(partition_filter_rows));

View File

@ -48,8 +48,7 @@ struct PartitionSortInfo {
const std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
RuntimeState* runtime_state, RuntimeProfile* runtime_profile,
bool has_global_limit, int64_t partition_inner_limit,
TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row,
TPartTopNPhase::type topn_phase)
TopNAlgorithm::type top_n_algorithm, TPartTopNPhase::type topn_phase)
: _vsort_exec_exprs(vsort_exec_exprs),
_limit(limit),
_offset(offset),
@ -62,13 +61,12 @@ struct PartitionSortInfo {
_has_global_limit(has_global_limit),
_partition_inner_limit(partition_inner_limit),
_top_n_algorithm(top_n_algorithm),
_previous_row(previous_row),
_topn_phase(topn_phase) {}
public:
VSortExecExprs* _vsort_exec_exprs = nullptr;
int64_t _limit = -1;
int64_t _offset = -1;
int64_t _offset = 0;
ObjectPool* _pool = nullptr;
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
@ -78,69 +76,47 @@ public:
bool _has_global_limit = false;
int64_t _partition_inner_limit = 0;
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
SortCursorCmp* _previous_row = nullptr;
TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL;
};
struct PartitionBlocks {
public:
PartitionBlocks() = default; //should fixed in pipelineX
PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, bool is_first_sorter)
: _is_first_sorter(is_first_sorter), _partition_sort_info(partition_sort_info) {}
~PartitionBlocks() = default;
void add_row_idx(size_t row) { selector.push_back(row); }
void add_row_idx(size_t row) { _selector.push_back(row); }
void append_block_by_selector(const vectorized::Block* input_block,
const RowDescriptor& row_desc, bool is_limit,
int64_t partition_inner_limit, int batch_size) {
if (blocks.empty() || reach_limit()) {
_init_rows = batch_size;
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
}
auto columns = input_block->get_columns();
auto mutable_columns = blocks.back()->mutate_columns();
DCHECK(columns.size() == mutable_columns.size());
for (int i = 0; i < mutable_columns.size(); ++i) {
columns[i]->append_data_by_selector(mutable_columns[i], selector);
}
blocks.back()->set_columns(std::move(mutable_columns));
auto selector_rows = selector.size();
_init_rows = _init_rows - selector_rows;
_total_rows = _total_rows + selector_rows;
_current_input_rows = _current_input_rows + selector_rows;
selector.clear();
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
_partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) {
static_cast<void>(do_partition_topn_sort()); // fixed : should return status
_current_input_rows = 0; // reset record
}
}
Status append_block_by_selector(const vectorized::Block* input_block, bool eos);
Status do_partition_topn_sort();
void create_or_reset_sorter_state();
void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) {
auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
empty_block->swap(*input_block);
blocks.emplace_back(std::move(empty_block));
_blocks.emplace_back(std::move(empty_block));
}
bool reach_limit() {
return _init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES;
return _init_rows <= 0 || _blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES;
}
size_t get_total_rows() const { return _total_rows; }
size_t get_topn_filter_rows() const { return _topn_filter_rows; }
size_t get_do_topn_count() const { return _do_partition_topn_count; }
IColumn::Selector selector;
std::vector<std::unique_ptr<Block>> blocks;
IColumn::Selector _selector;
std::vector<std::unique_ptr<Block>> _blocks;
size_t _total_rows = 0;
size_t _current_input_rows = 0;
size_t _topn_filter_rows = 0;
size_t _do_partition_topn_count = 0;
int _init_rows = 4096;
bool _is_first_sorter = false;
std::unique_ptr<SortCursorCmp> _previous_row;
std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr;
std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
};
@ -261,9 +237,9 @@ public:
private:
void _init_hash_method();
Status _split_block_by_partition(vectorized::Block* input_block, int batch_size);
void _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, int batch_size);
Status _split_block_by_partition(vectorized::Block* input_block, bool eos);
Status _emplace_into_hash_table(const ColumnRawPtrs& key_columns,
const vectorized::Block* input_block, bool eos);
Status get_sorted_block(RuntimeState* state, Block* output_block, bool* eos);
// hash table
@ -286,7 +262,6 @@ private:
int _num_partition = 0;
int64_t _partition_inner_limit = 0;
int _sort_idx = 0;
std::unique_ptr<SortCursorCmp> _previous_row;
std::queue<Block> _blocks_buffer;
int64_t child_input_rows = 0;
std::mutex _buffer_mutex;