[improvement](analytic) improve memory counter (#14890)
This commit is contained in:
@ -287,8 +287,8 @@ Status VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* bl
|
||||
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
|
||||
size_t current_block_rows = _input_blocks[_output_block_index].rows();
|
||||
if (next_partition) {
|
||||
_executor.execute(_partition_by_start, _partition_by_end, _partition_by_start,
|
||||
_partition_by_end);
|
||||
_executor.execute(_partition_by_start.pos, _partition_by_end.pos,
|
||||
_partition_by_start.pos, _partition_by_end.pos);
|
||||
}
|
||||
_executor.insert_result(current_block_rows);
|
||||
if (_window_end_position == current_block_rows) {
|
||||
@ -312,7 +312,8 @@ Status VAnalyticEvalNode::_get_next_for_range(RuntimeState* state, Block* block,
|
||||
_window_end_position < current_block_rows) {
|
||||
if (_current_row_position >= _order_by_end.pos) {
|
||||
_update_order_by_range();
|
||||
_executor.execute(_order_by_start, _order_by_end, _order_by_start, _order_by_end);
|
||||
_executor.execute(_order_by_start.pos, _order_by_end.pos, _order_by_start.pos,
|
||||
_order_by_end.pos);
|
||||
}
|
||||
_executor.insert_result(current_block_rows);
|
||||
}
|
||||
@ -335,25 +336,26 @@ Status VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block,
|
||||
size_t current_block_rows = _input_blocks[_output_block_index].rows();
|
||||
while (_current_row_position < _partition_by_end.pos &&
|
||||
_window_end_position < current_block_rows) {
|
||||
BlockRowPos range_start, range_end;
|
||||
int64_t range_start, range_end;
|
||||
if (!_window.__isset.window_start &&
|
||||
_window.window_end.type ==
|
||||
TAnalyticWindowBoundaryType::
|
||||
CURRENT_ROW) { //[preceding, current_row],[current_row, following]
|
||||
range_start.pos = _current_row_position;
|
||||
range_end.pos = _current_row_position +
|
||||
1; //going on calculate,add up data, no need to reset state
|
||||
range_start = _current_row_position;
|
||||
range_end = _current_row_position +
|
||||
1; //going on calculate,add up data, no need to reset state
|
||||
} else {
|
||||
_reset_agg_status();
|
||||
if (!_window.__isset
|
||||
.window_start) { //[preceding, offset] --unbound: [preceding, following]
|
||||
range_start.pos = _partition_by_start.pos;
|
||||
range_start = _partition_by_start.pos;
|
||||
} else {
|
||||
range_start.pos = _current_row_position + _rows_start_offset;
|
||||
range_start = _current_row_position + _rows_start_offset;
|
||||
}
|
||||
range_end.pos = _current_row_position + _rows_end_offset + 1;
|
||||
range_end = _current_row_position + _rows_end_offset + 1;
|
||||
}
|
||||
_executor.execute(_partition_by_start, _partition_by_end, range_start, range_end);
|
||||
_executor.execute(_partition_by_start.pos, _partition_by_end.pos, range_start,
|
||||
range_end);
|
||||
_executor.insert_result(current_block_rows);
|
||||
}
|
||||
if (_window_end_position == current_block_rows) {
|
||||
@ -595,6 +597,7 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) {
|
||||
|
||||
block->swap(std::move(_input_blocks[_output_block_index]));
|
||||
_blocks_memory_usage->add(-block->allocated_bytes());
|
||||
mem_tracker_held()->consume(-block->allocated_bytes());
|
||||
if (_origin_cols.size() < block->columns()) {
|
||||
block->erase_not_in(_origin_cols);
|
||||
}
|
||||
@ -618,16 +621,15 @@ Status VAnalyticEvalNode::_output_current_block(Block* block) {
|
||||
|
||||
//now is execute for lead/lag row_number/rank/dense_rank/ntile functions
|
||||
//sum min max count avg first_value last_value functions
|
||||
void VAnalyticEvalNode::_execute_for_win_func(BlockRowPos partition_start,
|
||||
BlockRowPos partition_end, BlockRowPos frame_start,
|
||||
BlockRowPos frame_end) {
|
||||
void VAnalyticEvalNode::_execute_for_win_func(int64_t partition_start, int64_t partition_end,
|
||||
int64_t frame_start, int64_t frame_end) {
|
||||
for (size_t i = 0; i < _agg_functions_size; ++i) {
|
||||
std::vector<const IColumn*> _agg_columns;
|
||||
for (int j = 0; j < _agg_intput_columns[i].size(); ++j) {
|
||||
_agg_columns.push_back(_agg_intput_columns[i][j].get());
|
||||
}
|
||||
_agg_functions[i]->function()->add_range_single_place(
|
||||
partition_start.pos, partition_end.pos, frame_start.pos, frame_end.pos,
|
||||
partition_start, partition_end, frame_start, frame_end,
|
||||
_fn_place_ptr + _offsets_of_aggregate_states[i], _agg_columns.data(), nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -58,8 +58,8 @@ private:
|
||||
Status _get_next_for_range(RuntimeState* state, Block* block, bool* eos);
|
||||
Status _get_next_for_partition(RuntimeState* state, Block* block, bool* eos);
|
||||
|
||||
void _execute_for_win_func(BlockRowPos partition_start, BlockRowPos partition_end,
|
||||
BlockRowPos frame_start, BlockRowPos frame_end);
|
||||
void _execute_for_win_func(int64_t partition_start, int64_t partition_end, int64_t frame_start,
|
||||
int64_t frame_end);
|
||||
|
||||
Status _reset_agg_status();
|
||||
Status _init_result_columns();
|
||||
@ -80,9 +80,8 @@ private:
|
||||
bool whether_need_next_partition(BlockRowPos found_partition_end);
|
||||
|
||||
std::string debug_window_bound_string(TAnalyticWindowBoundary b);
|
||||
using vectorized_execute =
|
||||
std::function<void(BlockRowPos peer_group_start, BlockRowPos peer_group_end,
|
||||
BlockRowPos frame_start, BlockRowPos frame_end)>;
|
||||
using vectorized_execute = std::function<void(int64_t peer_group_start, int64_t peer_group_end,
|
||||
int64_t frame_start, int64_t frame_end)>;
|
||||
using vectorized_get_next = std::function<Status(RuntimeState* state, Block* block, bool* eos)>;
|
||||
using vectorized_get_result = std::function<void(int64_t current_block_rows)>;
|
||||
using vectorized_closer = std::function<void()>;
|
||||
|
||||
Reference in New Issue
Block a user