[improve](node) refactor partition sort node to reduce memory use
pipelineX
This commit is contained in:
@ -44,6 +44,10 @@ Status PartitionSortSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo
|
||||
_build_timer = ADD_TIMER(_profile, "HashTableBuildTime");
|
||||
_selector_block_timer = ADD_TIMER(_profile, "SelectorBlockTime");
|
||||
_emplace_key_timer = ADD_TIMER(_profile, "EmplaceKeyTime");
|
||||
_partition_sort_info = std::make_shared<vectorized::PartitionSortInfo>(
|
||||
&_vsort_exec_exprs, p._limit, 0, p._pool, p._is_asc_order, p._nulls_first,
|
||||
p._child_x->row_desc(), state, _profile, p._has_global_limit, p._partition_inner_limit,
|
||||
p._top_n_algorithm, _shared_state->previous_row.get(), p._topn_phase);
|
||||
_init_hash_method();
|
||||
return Status::OK();
|
||||
}
|
||||
@ -100,7 +104,8 @@ Status PartitionSortSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
|
||||
local_state.child_input_rows = local_state.child_input_rows + current_rows;
|
||||
if (UNLIKELY(_partition_exprs_num == 0)) {
|
||||
if (UNLIKELY(local_state._value_places.empty())) {
|
||||
local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks()));
|
||||
local_state._value_places.push_back(_pool->add(new vectorized::PartitionBlocks(
|
||||
local_state._partition_sort_info, local_state._value_places.empty())));
|
||||
}
|
||||
//no partition key
|
||||
local_state._value_places[0]->append_whole_block(input_block, _child_x->row_desc());
|
||||
@ -187,13 +192,15 @@ void PartitionSortSinkOperatorX::_emplace_into_hash_table(
|
||||
|
||||
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
|
||||
HashMethodType::try_presis_key(key, origin, *local_state._agg_arena_pool);
|
||||
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks());
|
||||
auto* aggregate_data = _pool->add(new vectorized::PartitionBlocks(
|
||||
local_state._partition_sort_info, local_state._value_places.empty()));
|
||||
local_state._value_places.push_back(aggregate_data);
|
||||
ctor(key, aggregate_data);
|
||||
local_state._num_partition++;
|
||||
};
|
||||
auto creator_for_null_key = [&](auto& mapped) {
|
||||
mapped = _pool->add(new vectorized::PartitionBlocks());
|
||||
mapped = _pool->add(new vectorized::PartitionBlocks(
|
||||
local_state._partition_sort_info, local_state._value_places.empty()));
|
||||
local_state._value_places.push_back(mapped);
|
||||
local_state._num_partition++;
|
||||
};
|
||||
|
||||
@ -81,6 +81,7 @@ private:
|
||||
std::unique_ptr<vectorized::PartitionedHashMapVariants> _partitioned_data;
|
||||
std::unique_ptr<vectorized::Arena> _agg_arena_pool;
|
||||
int _partition_exprs_num = 0;
|
||||
std::shared_ptr<vectorized::PartitionSortInfo> _partition_sort_info = nullptr;
|
||||
|
||||
RuntimeProfile::Counter* _build_timer = nullptr;
|
||||
RuntimeProfile::Counter* _emplace_key_timer = nullptr;
|
||||
|
||||
@ -33,10 +33,50 @@
|
||||
#include "vec/common/hash_table/hash_map_context_creator.h"
|
||||
#include "vec/common/hash_table/hash_set.h"
|
||||
#include "vec/common/hash_table/partitioned_hash_map.h"
|
||||
#include "vec/core/block.h"
|
||||
#include "vec/exprs/vexpr.h"
|
||||
#include "vec/exprs/vexpr_context.h"
|
||||
|
||||
namespace doris::vectorized {
|
||||
Status PartitionBlocks::do_partition_topn_sort() {
|
||||
if (_partition_topn_sorter == nullptr) {
|
||||
_partition_topn_sorter = PartitionSorter::create_unique(
|
||||
*_partition_sort_info->_vsort_exec_exprs, _partition_sort_info->_limit,
|
||||
_partition_sort_info->_offset, _partition_sort_info->_pool,
|
||||
_partition_sort_info->_is_asc_order, _partition_sort_info->_nulls_first,
|
||||
_partition_sort_info->_row_desc, _partition_sort_info->_runtime_state, nullptr,
|
||||
_partition_sort_info->_has_global_limit,
|
||||
_partition_sort_info->_partition_inner_limit,
|
||||
_partition_sort_info->_top_n_algorithm, _partition_sort_info->_previous_row);
|
||||
}
|
||||
|
||||
for (const auto& block : blocks) {
|
||||
RETURN_IF_ERROR(_partition_topn_sorter->append_block(block.get()));
|
||||
}
|
||||
blocks.clear();
|
||||
_partition_topn_sorter->init_profile(_partition_sort_info->_runtime_profile);
|
||||
RETURN_IF_ERROR(_partition_topn_sorter->prepare_for_read());
|
||||
bool current_eos = false;
|
||||
size_t current_output_rows = 0;
|
||||
while (!current_eos) {
|
||||
// output_block maybe need better way
|
||||
auto output_block = Block::create_unique(
|
||||
VectorizedUtils::create_empty_block(_partition_sort_info->_row_desc));
|
||||
RETURN_IF_ERROR(_partition_topn_sorter->get_next(_partition_sort_info->_runtime_state,
|
||||
output_block.get(), ¤t_eos));
|
||||
auto rows = output_block->rows();
|
||||
if (rows > 0) {
|
||||
current_output_rows += rows;
|
||||
blocks.emplace_back(std::move(output_block));
|
||||
}
|
||||
}
|
||||
|
||||
_topn_filter_rows += (_current_input_rows - current_output_rows);
|
||||
_partition_sort_info->_previous_row->reset();
|
||||
_partition_topn_sorter.reset(nullptr);
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
VPartitionSortNode::VPartitionSortNode(ObjectPool* pool, const TPlanNode& tnode,
|
||||
const DescriptorTbl& descs)
|
||||
@ -79,6 +119,7 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
|
||||
_get_sorted_timer = ADD_TIMER(runtime_profile(), "GetSortedTime");
|
||||
_selector_block_timer = ADD_TIMER(runtime_profile(), "SelectorBlockTime");
|
||||
_emplace_key_timer = ADD_TIMER(runtime_profile(), "EmplaceKeyTime");
|
||||
runtime_profile()->add_info_string("CurrentTopNPhase", std::to_string(_topn_phase));
|
||||
|
||||
RETURN_IF_ERROR(ExecNode::prepare(state));
|
||||
SCOPED_TIMER(_exec_timer);
|
||||
@ -86,6 +127,10 @@ Status VPartitionSortNode::prepare(RuntimeState* state) {
|
||||
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, child(0)->row_desc()));
|
||||
_init_hash_method();
|
||||
|
||||
_partition_sort_info = std::make_shared<PartitionSortInfo>(
|
||||
&_vsort_exec_exprs, _limit, 0, _pool, _is_asc_order, _nulls_first, child(0)->row_desc(),
|
||||
state, _runtime_profile.get(), _has_global_limit, _partition_inner_limit,
|
||||
_top_n_algorithm, _previous_row.get(), _topn_phase);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
@ -116,13 +161,15 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum
|
||||
|
||||
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
|
||||
HashMethodType::try_presis_key(key, origin, *_agg_arena_pool);
|
||||
auto* aggregate_data = _pool->add(new PartitionBlocks());
|
||||
auto* aggregate_data = _pool->add(
|
||||
new PartitionBlocks(_partition_sort_info, _value_places.empty()));
|
||||
_value_places.push_back(aggregate_data);
|
||||
ctor(key, aggregate_data);
|
||||
_num_partition++;
|
||||
};
|
||||
auto creator_for_null_key = [&](auto& mapped) {
|
||||
mapped = _pool->add(new PartitionBlocks());
|
||||
mapped = _pool->add(
|
||||
new PartitionBlocks(_partition_sort_info, _value_places.empty()));
|
||||
_value_places.push_back(mapped);
|
||||
_num_partition++;
|
||||
};
|
||||
@ -135,7 +182,7 @@ void VPartitionSortNode::_emplace_into_hash_table(const ColumnRawPtrs& key_colum
|
||||
}
|
||||
|
||||
SCOPED_TIMER(_selector_block_timer);
|
||||
for (auto place : _value_places) {
|
||||
for (auto* place : _value_places) {
|
||||
place->append_block_by_selector(input_block, child(0)->row_desc(),
|
||||
_has_global_limit, _partition_inner_limit,
|
||||
batch_size);
|
||||
@ -151,7 +198,8 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
|
||||
child_input_rows = child_input_rows + current_rows;
|
||||
if (UNLIKELY(_partition_exprs_num == 0)) {
|
||||
if (UNLIKELY(_value_places.empty())) {
|
||||
_value_places.push_back(_pool->add(new PartitionBlocks()));
|
||||
_value_places.push_back(_pool->add(
|
||||
new PartitionBlocks(_partition_sort_info, _value_places.empty())));
|
||||
}
|
||||
//no partition key
|
||||
_value_places[0]->append_whole_block(input_block, child(0)->row_desc());
|
||||
@ -195,9 +243,7 @@ Status VPartitionSortNode::sink(RuntimeState* state, vectorized::Block* input_bl
|
||||
RETURN_IF_ERROR(sorter->prepare_for_read());
|
||||
_partition_sorts.push_back(std::move(sorter));
|
||||
}
|
||||
if (state->enable_profile()) {
|
||||
debug_profile();
|
||||
}
|
||||
|
||||
COUNTER_SET(_hash_table_size_counter, int64_t(_num_partition));
|
||||
//so all data from child have sink completed
|
||||
_can_read = true;
|
||||
@ -312,14 +358,15 @@ Status VPartitionSortNode::close(RuntimeState* state) {
|
||||
if (is_closed()) {
|
||||
return Status::OK();
|
||||
}
|
||||
if (state->enable_profile()) {
|
||||
debug_profile();
|
||||
}
|
||||
|
||||
return ExecNode::close(state);
|
||||
}
|
||||
|
||||
void VPartitionSortNode::release_resource(RuntimeState* state) {
|
||||
_vsort_exec_exprs.close(state);
|
||||
if (state->enable_profile()) {
|
||||
debug_profile();
|
||||
}
|
||||
ExecNode::release_resource(state);
|
||||
}
|
||||
|
||||
@ -389,19 +436,25 @@ void VPartitionSortNode::_init_hash_method() {
|
||||
}
|
||||
|
||||
void VPartitionSortNode::debug_profile() {
|
||||
fmt::memory_buffer partition_rows_read, partition_blocks_read;
|
||||
fmt::memory_buffer partition_rows_read, partition_blocks_read, partition_filter_rows;
|
||||
fmt::format_to(partition_rows_read, "[");
|
||||
fmt::format_to(partition_blocks_read, "[");
|
||||
for (auto place : _value_places) {
|
||||
fmt::format_to(partition_filter_rows, "[");
|
||||
|
||||
for (auto* place : _value_places) {
|
||||
fmt::format_to(partition_rows_read, "{}, ", place->get_total_rows());
|
||||
fmt::format_to(partition_filter_rows, "{}, ", place->get_topn_filter_rows());
|
||||
fmt::format_to(partition_blocks_read, "{}, ", place->blocks.size());
|
||||
}
|
||||
fmt::format_to(partition_rows_read, "]");
|
||||
fmt::format_to(partition_blocks_read, "]");
|
||||
fmt::format_to(partition_filter_rows, "]");
|
||||
|
||||
runtime_profile()->add_info_string("PerPartitionBlocksRead",
|
||||
fmt::to_string(partition_blocks_read));
|
||||
runtime_profile()->add_info_string("PerPartitionRowsRead", fmt::to_string(partition_rows_read));
|
||||
runtime_profile()->add_info_string("PerPartitionFilterRows",
|
||||
fmt::to_string(partition_filter_rows));
|
||||
fmt::memory_buffer partition_output_rows;
|
||||
fmt::format_to(partition_output_rows, "[");
|
||||
for (auto row : partition_profile_output_rows) {
|
||||
|
||||
@ -23,6 +23,7 @@
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
#include "common/status.h"
|
||||
#include "exec/exec_node.h"
|
||||
#include "vec/columns/column.h"
|
||||
#include "vec/common/columns_hashing.h"
|
||||
@ -37,10 +38,55 @@
|
||||
namespace doris {
|
||||
namespace vectorized {
|
||||
static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20;
|
||||
static constexpr size_t PARTITION_SORT_ROWS_THRESHOLD = 20000;
|
||||
|
||||
struct PartitionSortInfo {
|
||||
~PartitionSortInfo() = default;
|
||||
|
||||
PartitionSortInfo(VSortExecExprs* vsort_exec_exprs, int64_t limit, int64_t offset,
|
||||
ObjectPool* pool, const std::vector<bool>& is_asc_order,
|
||||
const std::vector<bool>& nulls_first, const RowDescriptor& row_desc,
|
||||
RuntimeState* runtime_state, RuntimeProfile* runtime_profile,
|
||||
bool has_global_limit, int64_t partition_inner_limit,
|
||||
TopNAlgorithm::type top_n_algorithm, SortCursorCmp* previous_row,
|
||||
TPartTopNPhase::type topn_phase)
|
||||
: _vsort_exec_exprs(vsort_exec_exprs),
|
||||
_limit(limit),
|
||||
_offset(offset),
|
||||
_pool(pool),
|
||||
_is_asc_order(is_asc_order),
|
||||
_nulls_first(nulls_first),
|
||||
_row_desc(row_desc),
|
||||
_runtime_state(runtime_state),
|
||||
_runtime_profile(runtime_profile),
|
||||
_has_global_limit(has_global_limit),
|
||||
_partition_inner_limit(partition_inner_limit),
|
||||
_top_n_algorithm(top_n_algorithm),
|
||||
_previous_row(previous_row),
|
||||
_topn_phase(topn_phase) {}
|
||||
|
||||
public:
|
||||
VSortExecExprs* _vsort_exec_exprs = nullptr;
|
||||
int64_t _limit = -1;
|
||||
int64_t _offset = -1;
|
||||
ObjectPool* _pool = nullptr;
|
||||
std::vector<bool> _is_asc_order;
|
||||
std::vector<bool> _nulls_first;
|
||||
const RowDescriptor& _row_desc;
|
||||
RuntimeState* _runtime_state = nullptr;
|
||||
RuntimeProfile* _runtime_profile = nullptr;
|
||||
bool _has_global_limit = false;
|
||||
int64_t _partition_inner_limit = 0;
|
||||
TopNAlgorithm::type _top_n_algorithm = TopNAlgorithm::ROW_NUMBER;
|
||||
SortCursorCmp* _previous_row = nullptr;
|
||||
TPartTopNPhase::type _topn_phase = TPartTopNPhase::TWO_PHASE_GLOBAL;
|
||||
};
|
||||
|
||||
struct PartitionBlocks {
|
||||
public:
|
||||
PartitionBlocks() = default;
|
||||
PartitionBlocks() = default; //should fixed in pipelineX
|
||||
PartitionBlocks(std::shared_ptr<PartitionSortInfo> partition_sort_info, bool is_first_sorter)
|
||||
: _is_first_sorter(is_first_sorter), _partition_sort_info(partition_sort_info) {}
|
||||
~PartitionBlocks() = default;
|
||||
|
||||
void add_row_idx(size_t row) { selector.push_back(row); }
|
||||
@ -49,7 +95,7 @@ public:
|
||||
const RowDescriptor& row_desc, bool is_limit,
|
||||
int64_t partition_inner_limit, int batch_size) {
|
||||
if (blocks.empty() || reach_limit()) {
|
||||
init_rows = batch_size;
|
||||
_init_rows = batch_size;
|
||||
blocks.push_back(Block::create_unique(VectorizedUtils::create_empty_block(row_desc)));
|
||||
}
|
||||
auto columns = input_block->get_columns();
|
||||
@ -59,11 +105,21 @@ public:
|
||||
columns[i]->append_data_by_selector(mutable_columns[i], selector);
|
||||
}
|
||||
blocks.back()->set_columns(std::move(mutable_columns));
|
||||
init_rows = init_rows - selector.size();
|
||||
total_rows = total_rows + selector.size();
|
||||
auto selector_rows = selector.size();
|
||||
_init_rows = _init_rows - selector_rows;
|
||||
_total_rows = _total_rows + selector_rows;
|
||||
_current_input_rows = _current_input_rows + selector_rows;
|
||||
selector.clear();
|
||||
// maybe better could change by user PARTITION_SORT_ROWS_THRESHOLD
|
||||
if (_current_input_rows >= PARTITION_SORT_ROWS_THRESHOLD &&
|
||||
_partition_sort_info->_topn_phase != TPartTopNPhase::TWO_PHASE_GLOBAL) {
|
||||
static_cast<void>(do_partition_topn_sort()); // fixed : should return status
|
||||
_current_input_rows = 0; // reset record
|
||||
}
|
||||
}
|
||||
|
||||
Status do_partition_topn_sort();
|
||||
|
||||
void append_whole_block(vectorized::Block* input_block, const RowDescriptor& row_desc) {
|
||||
auto empty_block = Block::create_unique(VectorizedUtils::create_empty_block(row_desc));
|
||||
empty_block->swap(*input_block);
|
||||
@ -71,15 +127,22 @@ public:
|
||||
}
|
||||
|
||||
bool reach_limit() {
|
||||
return init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES;
|
||||
return _init_rows <= 0 || blocks.back()->bytes() > INITIAL_BUFFERED_BLOCK_BYTES;
|
||||
}
|
||||
|
||||
size_t get_total_rows() const { return total_rows; }
|
||||
size_t get_total_rows() const { return _total_rows; }
|
||||
size_t get_topn_filter_rows() const { return _topn_filter_rows; }
|
||||
|
||||
IColumn::Selector selector;
|
||||
std::vector<std::unique_ptr<Block>> blocks;
|
||||
size_t total_rows = 0;
|
||||
int init_rows = 4096;
|
||||
size_t _total_rows = 0;
|
||||
size_t _current_input_rows = 0;
|
||||
size_t _topn_filter_rows = 0;
|
||||
int _init_rows = 4096;
|
||||
bool _is_first_sorter = false;
|
||||
|
||||
std::unique_ptr<PartitionSorter> _partition_topn_sorter = nullptr;
|
||||
std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
|
||||
};
|
||||
|
||||
using PartitionDataPtr = PartitionBlocks*;
|
||||
@ -213,6 +276,7 @@ private:
|
||||
|
||||
std::vector<std::unique_ptr<PartitionSorter>> _partition_sorts;
|
||||
std::vector<PartitionDataPtr> _value_places;
|
||||
std::shared_ptr<PartitionSortInfo> _partition_sort_info = nullptr;
|
||||
// Expressions and parameters used for build _sort_description
|
||||
VSortExecExprs _vsort_exec_exprs;
|
||||
std::vector<bool> _is_asc_order;
|
||||
|
||||
Reference in New Issue
Block a user